From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id E6F0C6EC41; Wed, 11 Aug 2021 12:00:04 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org E6F0C6EC41 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628672405; bh=BmdU+69Tf0YdoBcHAQGYUbxcwDwaO7kF3KdenbiG+Ys=; h=To:Cc:Date:In-Reply-To:References:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=BLpI97KWTJuZEgFcwxRP7pjaRbCuS3x2Qum/NeDzjOgVWoJgix9X9demdCCXlMXbA pUpYyOu4tVa9vOv8/kMg8DJiYE9ED0BGv+V5Dh1ZueIbUA0VTbUhH4Mv+anViIBQYL V21L9BjJMzs7E/1+pySzXEmCyrtecEEyqWHe3aM4= Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id CB91E6EC5D for ; Wed, 11 Aug 2021 11:57:06 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org CB91E6EC5D Received: by smtp39.i.mail.ru with esmtpa (envelope-from ) id 1mDk2r-0005BM-Am; Wed, 11 Aug 2021 11:57:05 +0300 To: tarantool-patches@dev.tarantool.org, vdavydov@tarantool.org, v.shpilevoy@tarantool.org Cc: mechanik20051988 Date: Wed, 11 Aug 2021 11:56:56 +0300 Message-Id: <56db4eea5c868187e151658427e1a6d65736771a.1628671235.git.mechanik20051988@tarantool.org> X-Mailer: git-send-email 2.20.1 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: B8F34718100C35BD X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD9D5AC6413C25DCF08CC98B8FCC5CD86F3182A05F538085040F2A1EE8BFB9C25EB41631F2FB77DFBEC49C41FA442DB39975D5D40D2DA3C868F X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE76042E2DB3E33BF2BEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637B12C6B1582157D838638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8D147E38DD8EB90E56338B10FC56F212D117882F4460429724CE54428C33FAD305F5C1EE8F4F765FC8C7ADC89C2F0B2A5A471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F44604297287769387670735201E561CDFBCA1751F2CC0D3CB04F14752D2E47CDBA5A96583BA9C0B312567BB2376E601842F6C81A19E625A9149C048EEB89ED3C7A62817812AE38A8E97BAFFB1D8FC6C240DEA7642DBF02ECDB25306B2B78CF848AE20165D0A6AB1C7CE11FEE36E16326BB14E9037C0837EA9F3D19764C4224003CC836476EA7A3FFF5B025636E2021AF6380DFAD1A18204E546F3947CB11811A4A51E3B096D1867E19FE1407959CC434672EE6371089D37D7C0E48F6C8AA50765F7900637B5EAED125435346EEFF80C71ABB335746BA297DBC24807EABDAD6C7F3747799A X-B7AD71C0: AC4F5C86D027EB782CDD5689AFBDA7A213B5FB47DCBC3458834459D11680B505CEFCAD60ED5E5B762DC69BAC067E7DC3 X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8186998911F362727C414F749A5E30D975C6B668463F4F89F867C1D359755472C0C339009E29B67D0A49C2B6934AE262D3EE7EAB7254005DCED7532B743992DF240BDC6A1CF3F042BAD6DF99611D93F60EFA183FDCE24978B01699F904B3F4130E343918A1A30D5E7FCCB5012B2E24CD356 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D3453037B56386657217D0001F2C8E8966475BBF6539609C4959638D1CEB0BBA86EEE756ED97F9970971D7E09C32AA3244C31537EB9FB353FD5491F7163E029560A3E8609A02908F271927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj6qlzQV0oSZNYQ4DJg7dlqg== X-Mailru-Sender: 583F1D7ACE8F49BD29FC049B2A5BF9632710EBCDEE68D6866D54773A2A83F783695C0918B975D0EEB79567116EAC6FCF4E830D9205DBEA545646F0D3C63A617F27ACC94E9A535D22112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: mechanik20051988 via Tarantool-patches Reply-To: mechanik20051988 Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" From: mechanik20051988 Add stream support to `net.box`. In "net.box", stream is an object over connection that has the same methods, but all requests from it sends with non-zero stream ID. Since there can be a lot of streams, we do not copy the spaces from the connection to the stream immediately when creating a stream, but do it only when we first access space. Also, when updating the schema, we update the spaces in lazy mode: each stream has it's own schema_version, when there is some access to stream space we compare stream schema_version and connection schema_version and if they are different update clear stream space cache and wrap space that is being accessed to stream cache. Part of #5860 @TarantoolBot document Title: stream support was added to net.box In "net.box", stream is an object over connection that has the same methods, but all requests from it sends with non-zero stream ID. Stream ID is generated on the client automatically. Simple example of stream creation using net.box: ```lua stream = conn:new_stream() -- all connection methods are valid, but send requests -- with non zero stream_id. ``` --- src/box/lua/net_box.c | 95 ++-- src/box/lua/net_box.lua | 205 ++++++-- test/box/access.result | 6 +- test/box/access.test.lua | 6 +- ...net.box_console_connections_gh-2677.result | 2 +- ...t.box_console_connections_gh-2677.test.lua | 2 +- .../net.box_incorrect_iterator_gh-841.result | 4 +- ...net.box_incorrect_iterator_gh-841.test.lua | 4 +- test/box/net.box_iproto_hangs_gh-3464.result | 2 +- .../box/net.box_iproto_hangs_gh-3464.test.lua | 2 +- .../net.box_long-poll_input_gh-3400.result | 8 +- .../net.box_long-poll_input_gh-3400.test.lua | 8 +- test/box/stream.lua | 13 + test/box/stream.result | 485 ++++++++++++++++++ test/box/stream.test.lua | 182 +++++++ test/box/suite.ini | 2 +- 16 files changed, 934 insertions(+), 92 deletions(-) create mode 100644 test/box/stream.lua create mode 100644 test/box/stream.result create mode 100644 test/box/stream.test.lua diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index 06e574cdf..3bc49af23 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -328,7 +328,7 @@ netbox_registry_reset(struct netbox_registry *registry, struct error *error) static inline size_t netbox_begin_encode(struct mpstream *stream, uint64_t sync, - enum iproto_type type) + enum iproto_type type, uint64_t stream_id) { /* Remember initial size of ibuf (see netbox_end_encode()) */ struct ibuf *ibuf = stream->ctx; @@ -340,7 +340,7 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync, mpstream_advance(stream, fixheader_size); /* encode header */ - mpstream_encode_map(stream, 2); + mpstream_encode_map(stream, stream_id != 0 ? 3 : 2); mpstream_encode_uint(stream, IPROTO_SYNC); mpstream_encode_uint(stream, sync); @@ -348,6 +348,10 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync, mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE); mpstream_encode_uint(stream, type); + if (stream_id != 0) { + mpstream_encode_uint(stream, IPROTO_STREAM_ID); + mpstream_encode_uint(stream, stream_id); + } /* Caller should remember how many bytes was used in ibuf */ return used; } @@ -380,11 +384,11 @@ netbox_end_encode(struct mpstream *stream, size_t initial_size) static void netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { (void)L; (void)idx; - size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING, stream_id); netbox_end_encode(stream, svp); } @@ -402,7 +406,7 @@ netbox_encode_auth(struct ibuf *ibuf, uint64_t sync, struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, mpstream_error_handler, &is_error); - size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH); + size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH, 0); /* Adapted from xrow_encode_auth() */ mpstream_encode_map(&stream, password != NULL ? 2 : 1); @@ -432,7 +436,7 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id) struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, mpstream_error_handler, &is_error); - size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT); + size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT, 0); mpstream_encode_map(&stream, 3); mpstream_encode_uint(&stream, IPROTO_SPACE_ID); mpstream_encode_uint(&stream, space_id); @@ -446,10 +450,10 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id) static void netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync, enum iproto_type type) + uint64_t sync, enum iproto_type type, uint64_t stream_id) { /* Lua stack at idx: function_name, args */ - size_t svp = netbox_begin_encode(stream, sync, type); + size_t svp = netbox_begin_encode(stream, sync, type, stream_id); mpstream_encode_map(stream, 2); @@ -468,24 +472,25 @@ netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_call_16(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL_16); + netbox_encode_call_impl(L, idx, stream, sync, + IPROTO_CALL_16, stream_id); } static void netbox_encode_call(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL); + netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL, stream_id); } static void netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: expr, args */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL, stream_id); mpstream_encode_map(stream, 2); @@ -504,10 +509,11 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_select(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, index_id, iterator, offset, limit, key */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT, + stream_id); mpstream_encode_map(stream, 6); @@ -546,10 +552,11 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync, enum iproto_type type) + uint64_t sync, enum iproto_type type, + uint64_t stream_id) { /* Lua stack at idx: space_id, tuple */ - size_t svp = netbox_begin_encode(stream, sync, type); + size_t svp = netbox_begin_encode(stream, sync, type, stream_id); mpstream_encode_map(stream, 2); @@ -567,24 +574,27 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_insert(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_INSERT); + netbox_encode_insert_or_replace(L, idx, stream, sync, + IPROTO_INSERT, stream_id); } static void netbox_encode_replace(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_REPLACE); + netbox_encode_insert_or_replace(L, idx, stream, sync, + IPROTO_REPLACE, stream_id); } static void netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, index_id, key */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE, + stream_id); mpstream_encode_map(stream, 3); @@ -607,10 +617,11 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_update(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, index_id, key, ops */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE, + stream_id); mpstream_encode_map(stream, 5); @@ -641,10 +652,11 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, tuple, ops */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT, + stream_id); mpstream_encode_map(stream, 4); @@ -844,10 +856,11 @@ netbox_send_and_recv_console(int fd, struct ibuf *send_buf, static void netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: query, parameters, options */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE, + stream_id); mpstream_encode_map(stream, 3); @@ -873,10 +886,11 @@ netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: query */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE, + stream_id); mpstream_encode_map(stream, 1); @@ -896,18 +910,19 @@ netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: query, parameters, options */ - netbox_encode_prepare(L, idx, stream, sync); + netbox_encode_prepare(L, idx, stream, sync, stream_id); } static void netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: bytes */ (void)sync; + (void)stream_id; size_t len; const char *data = lua_tolstring(L, idx, &len); mpstream_memcpy(stream, data, len); @@ -921,11 +936,11 @@ netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream, */ static int netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method, - struct ibuf *ibuf, uint64_t sync) + struct ibuf *ibuf, uint64_t sync, uint64_t stream_id) { typedef void (*method_encoder_f)(struct lua_State *L, int idx, struct mpstream *stream, - uint64_t sync); + uint64_t sync, uint64_t stream_id); static method_encoder_f method_encoder[] = { [NETBOX_PING] = netbox_encode_ping, [NETBOX_CALL_16] = netbox_encode_call_16, @@ -949,7 +964,7 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method, struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, luamp_error, L); - method_encoder[method](L, idx, &stream, sync); + method_encoder[method](L, idx, &stream, sync, stream_id); return 0; } @@ -1569,6 +1584,7 @@ netbox_new_registry(struct lua_State *L) * - on_push: on_push trigger function * - on_push_ctx: on_push trigger function argument * - format: tuple format to use for decoding the body or nil + * - stream_id: determines whether or not the request belongs to stream * - ...: method-specific arguments passed to the encoder */ static void @@ -1581,7 +1597,8 @@ netbox_make_request(struct lua_State *L, int idx, enum netbox_method method = lua_tointeger(L, idx + 4); assert(method < netbox_method_MAX); uint64_t sync = registry->next_sync++; - netbox_encode_method(L, idx + 8, method, send_buf, sync); + uint64_t stream_id = luaL_touint64(L, idx + 8); + netbox_encode_method(L, idx + 9, method, send_buf, sync, stream_id); /* Initialize and register the request object. */ request->method = method; diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 8f5671c15..3dffc245f 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -275,14 +275,14 @@ local function create_transport(host, port, user, password, callback, -- @retval not nil Future object. -- local function perform_async_request(buffer, skip_header, method, on_push, - on_push_ctx, format, ...) + on_push_ctx, format, stream_id, ...) local err = prepare_perform_request() if err then return nil, err end return perform_async_request_impl(requests, send_buf, buffer, skip_header, method, on_push, - on_push_ctx, format, ...) + on_push_ctx, format, stream_id, ...) end -- @@ -291,14 +291,15 @@ local function create_transport(host, port, user, password, callback, -- @retval not nil Response object. -- local function perform_request(timeout, buffer, skip_header, method, - on_push, on_push_ctx, format, ...) + on_push, on_push_ctx, format, + stream_id, ...) local err = prepare_perform_request() if err then return nil, err end return perform_request_impl(timeout, requests, send_buf, buffer, skip_header, method, on_push, on_push_ctx, - format, ...) + format, stream_id, ...) end -- PROTOCOL STATE MACHINE (WORKER FIBER) -- @@ -487,6 +488,37 @@ local function remote_serialize(self) } end +local function stream_serialize(self) + return { + host = self._conn.host, + port = self._conn.port, + opts = next(self._conn.opts) and self._conn.opts, + state = self._conn.state, + error = self._conn.error, + protocol = self._conn.protocol, + schema_version = self._conn.schema_version, + peer_uuid = self._conn.peer_uuid, + peer_version_id = self._conn.peer_version_id, + stream_id = self._stream_id + } +end + +local function stream_spaces_serialize(self) + return self._stream._conn.space +end + +local function stream_space_serialize(self) + return self._src +end + +local function stream_indexes_serialize(self) + return self._space._src.index +end + +local function stream_index_serialize(self) + return self._src +end + local remote_methods = {} local remote_mt = { __index = remote_methods, __serialize = remote_serialize, @@ -499,6 +531,86 @@ local console_mt = { __metatable = false } +-- Create stream space index, which is same as connection space +-- index, but have non zero stream ID. +local function stream_wrap_index(stream_id, src) + return setmetatable({ + _stream_id = stream_id, + _src = src, + }, { + __index = src, + __serialize = stream_index_serialize + }) +end + +-- Metatable for stream space indexes. When stream space being +-- created there are no indexes in it. When accessing the space +-- index, we look for corresponding space index in corresponding +-- connection space. If it is found we create same index for the +-- stream space but with corresponding stream ID. We do not need +-- to compare stream _schema_version and connection schema_version, +-- because all access to index is carried out through it's space. +-- So we update schema_version when we access space. +local stream_indexes_mt = { + __index = function(self, key) + local _space = self._space + local src = _space._src.index[key] + if not src then + return nil + end + local res = stream_wrap_index(_space._stream_id, src) + self[key] = res + return res + end, + __serialize = stream_indexes_serialize +} + +-- Create stream space, which is same as connection space, +-- but have non zero stream ID. +local function stream_wrap_space(stream, src) + local res = setmetatable({ + _stream_id = stream._stream_id, + _src = src, + index = setmetatable({ + _space = nil, + }, stream_indexes_mt) + }, { + __index = src, + __serialize = stream_space_serialize + }) + res.index._space = res + return res +end + +-- Metatable for stream spaces. When stream being created there +-- are no spaces in it. When user try to access some space in +-- stream, we first of all compare _schema_version of stream with +-- schema_version from connection and if they are not equal, we +-- clear stream space cache and update it's schema_version. Then +-- we look for corresponding space in the connection. If it is +-- found we create same space for the stream but with corresponding +-- stream ID. +local stream_spaces_mt = { + __index = function(self, key) + local stream = self._stream + if stream._schema_version ~= stream._conn.schema_version then + stream._schema_version = stream._conn.schema_version + self._stream_space_cache = {} + end + if self._stream_space_cache[key] then + return self._stream_space_cache[key] + end + local src = stream._conn.space[key] + if not src then + return nil + end + local res = stream_wrap_space(stream, src) + self._stream_space_cache[key] = res + return res + end, + __serialize = stream_spaces_serialize +} + local space_metatable, index_metatable local function new_sm(host, port, opts, connection, greeting) @@ -578,6 +690,8 @@ local function new_sm(host, port, opts, connection, greeting) if opts.wait_connected ~= false then remote._transport.wait_state('active', tonumber(opts.wait_connected)) end + -- Last stream ID used for this connection + remote._last_stream_id = 0 return remote end @@ -635,6 +749,28 @@ local function check_eval_args(args) end end +local function new_stream(stream) + check_remote_arg(stream, 'new_stream') + return stream._conn:new_stream() +end + +function remote_methods:new_stream() + check_remote_arg(self, 'new_stream') + self._last_stream_id = self._last_stream_id + 1 + local stream = setmetatable({ + new_stream = new_stream, + _stream_id = self._last_stream_id, + space = setmetatable({ + _stream_space_cache = {}, + _stream = nil, + }, stream_spaces_mt), + _conn = self, + _schema_version = self.schema_version, + }, { __index = self, __serialize = stream_serialize }) + stream.space._stream = stream + return stream +end + function remote_methods:close() check_remote_arg(self, 'close') self._transport.stop() @@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout) return self._transport.wait_state('active', timeout) end -function remote_methods:_request(method, opts, format, ...) +function remote_methods:_request(method, opts, format, stream_id, ...) local transport = self._transport local on_push, on_push_ctx, buffer, skip_header, deadline -- Extract options, set defaults, check if the request is @@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...) local res, err = transport.perform_async_request(buffer, skip_header, method, table.insert, {}, format, - ...) + stream_id, ...) if err then box.error(err) end @@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...) end local res, err = transport.perform_request(timeout, buffer, skip_header, method, on_push, on_push_ctx, - format, ...) + format, stream_id, ...) if err then box.error(err) end @@ -718,7 +854,7 @@ end function remote_methods:ping(opts) check_remote_arg(self, 'ping') - return (pcall(self._request, self, M_PING, opts)) + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id)) end function remote_methods:reload_schema() @@ -729,14 +865,16 @@ end -- @deprecated since 1.7.4 function remote_methods:call_16(func_name, ...) check_remote_arg(self, 'call') - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...})) + return (self:_request(M_CALL_16, nil, nil, self._stream_id, + tostring(func_name), {...})) end function remote_methods:call(func_name, args, opts) check_remote_arg(self, 'call') check_call_args(args) args = args or {} - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args) + local res = self:_request(M_CALL_17, opts, nil, self._stream_id, + tostring(func_name), args) if type(res) ~= 'table' or opts and opts.is_async then return res end @@ -746,14 +884,15 @@ end -- @deprecated since 1.7.4 function remote_methods:eval_16(code, ...) check_remote_arg(self, 'eval') - return unpack((self:_request(M_EVAL, nil, nil, code, {...}))) + return unpack((self:_request(M_EVAL, nil, nil, self._stream_id, + code, {...}))) end function remote_methods:eval(code, args, opts) check_remote_arg(self, 'eval') check_eval_args(args) args = args or {} - local res = self:_request(M_EVAL, opts, nil, code, args) + local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args) if type(res) ~= 'table' or opts and opts.is_async then return res end @@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts) if sql_opts ~= nil then box.error(box.error.UNSUPPORTED, "execute", "options") end - return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {}, - sql_opts or {}) + return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id, + query, parameters or {}, sql_opts or {}) end function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args @@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua if sql_opts ~= nil then box.error(box.error.UNSUPPORTED, "prepare", "options") end - return self:_request(M_PREPARE, netbox_opts, nil, query) + return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query) end function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) @@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) if sql_opts ~= nil then box.error(box.error.UNSUPPORTED, "unprepare", "options") end - return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {}, - sql_opts or {}) + return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id, + query, parameters or {}, sql_opts or {}) end function remote_methods:wait_state(state, timeout) @@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout) end if self.protocol == 'Binary' then local loader = 'return require("console").eval(...)' - res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader, + res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader, {line}) else assert(self.protocol == 'Lua console') - res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, + res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil, line..'$EOF$\n') end if err then @@ -951,14 +1090,14 @@ space_metatable = function(remote) function methods:insert(tuple, opts) check_space_arg(self, 'insert') - return remote:_request(M_INSERT, opts, self._format_cdata, self.id, - tuple) + return remote:_request(M_INSERT, opts, self._format_cdata, + self._stream_id, self.id, tuple) end function methods:replace(tuple, opts) check_space_arg(self, 'replace') - return remote:_request(M_REPLACE, opts, self._format_cdata, self.id, - tuple) + return remote:_request(M_REPLACE, opts, self._format_cdata, + self._stream_id, self.id, tuple) end function methods:select(key, opts) @@ -978,7 +1117,8 @@ space_metatable = function(remote) function methods:upsert(key, oplist, opts) check_space_arg(self, 'upsert') - return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id, + return nothing_or_data(remote:_request(M_UPSERT, opts, nil, + self._stream_id, self.id, key, oplist)) end @@ -1009,8 +1149,8 @@ index_metatable = function(remote) local offset = tonumber(opts and opts.offset) or 0 local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF return (remote:_request(M_SELECT, opts, self.space._format_cdata, - self.space.id, self.id, iterator, offset, - limit, key)) + self._stream_id, self.space.id, self.id, + iterator, offset, limit, key)) end function methods:get(key, opts) @@ -1020,6 +1160,7 @@ index_metatable = function(remote) end return nothing_or_data(remote:_request(M_GET, opts, self.space._format_cdata, + self._stream_id, self.space.id, self.id, box.index.EQ, 0, 2, key)) end @@ -1031,6 +1172,7 @@ index_metatable = function(remote) end return nothing_or_data(remote:_request(M_MIN, opts, self.space._format_cdata, + self._stream_id, self.space.id, self.id, box.index.GE, 0, 1, key)) end @@ -1042,6 +1184,7 @@ index_metatable = function(remote) end return nothing_or_data(remote:_request(M_MAX, opts, self.space._format_cdata, + self._stream_id, self.space.id, self.id, box.index.LE, 0, 1, key)) end @@ -1053,22 +1196,24 @@ index_metatable = function(remote) end local code = string.format('box.space.%s.index.%s:count', self.space.name, self.name) - return remote:_request(M_COUNT, opts, nil, code, { key, opts }) + return remote:_request(M_COUNT, opts, nil, self._stream_id, + code, { key, opts }) end function methods:delete(key, opts) check_index_arg(self, 'delete') return nothing_or_data(remote:_request(M_DELETE, opts, self.space._format_cdata, - self.space.id, self.id, key)) + self._stream_id, self.space.id, + self.id, key)) end function methods:update(key, oplist, opts) check_index_arg(self, 'update') return nothing_or_data(remote:_request(M_UPDATE, opts, self.space._format_cdata, - self.space.id, self.id, key, - oplist)) + self._stream_id, self.space.id, + self.id, key, oplist)) end return { __index = methods, __metatable = false } diff --git a/test/box/access.result b/test/box/access.result index 712cd68f8..6434da907 100644 --- a/test/box/access.result +++ b/test/box/access.result @@ -908,15 +908,15 @@ LISTEN = require('uri').parse(box.cfg.listen) c = net.connect(LISTEN.host, LISTEN.service) --- ... -c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) --- - error: Space '1' does not exist ... -c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) --- - error: Space '65537' does not exist ... -c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) --- - error: Space '4294967295' does not exist ... diff --git a/test/box/access.test.lua b/test/box/access.test.lua index 6060475d1..6abdb780d 100644 --- a/test/box/access.test.lua +++ b/test/box/access.test.lua @@ -351,9 +351,9 @@ box.schema.func.drop(name) -- very large space id, no crash occurs. LISTEN = require('uri').parse(box.cfg.listen) c = net.connect(LISTEN.host, LISTEN.service) -c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) -c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) -c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) c:close() session = box.session diff --git a/test/box/net.box_console_connections_gh-2677.result b/test/box/net.box_console_connections_gh-2677.result index f45aa0b56..7cea0a1da 100644 --- a/test/box/net.box_console_connections_gh-2677.result +++ b/test/box/net.box_console_connections_gh-2677.result @@ -74,7 +74,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80') --- ... while not c:is_connected() do fiber.sleep(0.01) end diff --git a/test/box/net.box_console_connections_gh-2677.test.lua b/test/box/net.box_console_connections_gh-2677.test.lua index 40d099e70..6c4e6ea4f 100644 --- a/test/box/net.box_console_connections_gh-2677.test.lua +++ b/test/box/net.box_console_connections_gh-2677.test.lua @@ -30,7 +30,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80') while not c:is_connected() do fiber.sleep(0.01) end c:ping() diff --git a/test/box/net.box_incorrect_iterator_gh-841.result b/test/box/net.box_incorrect_iterator_gh-841.result index fbd2a7700..cd2a86787 100644 --- a/test/box/net.box_incorrect_iterator_gh-841.result +++ b/test/box/net.box_incorrect_iterator_gh-841.result @@ -16,13 +16,13 @@ test_run:cmd("setopt delimiter ';'") - true ... function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) - local ret = cn:_request(remote._method.select, opts, nil, space_id, + local ret = cn:_request(remote._method.select, opts, nil, nil, space_id, index_id, iterator, offset, limit, key) return ret end function x_fatal(cn) cn._transport.perform_request(nil, nil, false, remote._method.inject, - nil, nil, nil, '\x80') + nil, nil, nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); --- diff --git a/test/box/net.box_incorrect_iterator_gh-841.test.lua b/test/box/net.box_incorrect_iterator_gh-841.test.lua index 1d24f9f56..9c42175ef 100644 --- a/test/box/net.box_incorrect_iterator_gh-841.test.lua +++ b/test/box/net.box_incorrect_iterator_gh-841.test.lua @@ -5,13 +5,13 @@ test_run:cmd("push filter ".."'\\.lua.*:[0-9]+: ' to '.lua...\"]:: '") test_run:cmd("setopt delimiter ';'") function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) - local ret = cn:_request(remote._method.select, opts, nil, space_id, + local ret = cn:_request(remote._method.select, opts, nil, nil, space_id, index_id, iterator, offset, limit, key) return ret end function x_fatal(cn) cn._transport.perform_request(nil, nil, false, remote._method.inject, - nil, nil, nil, '\x80') + nil, nil, nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); diff --git a/test/box/net.box_iproto_hangs_gh-3464.result b/test/box/net.box_iproto_hangs_gh-3464.result index 3b5458c9a..cbf8181b3 100644 --- a/test/box/net.box_iproto_hangs_gh-3464.result +++ b/test/box/net.box_iproto_hangs_gh-3464.result @@ -17,7 +17,7 @@ c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' --- ... -c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data) +c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data) --- - null - Peer closed diff --git a/test/box/net.box_iproto_hangs_gh-3464.test.lua b/test/box/net.box_iproto_hangs_gh-3464.test.lua index a7c41ae76..51a9ddece 100644 --- a/test/box/net.box_iproto_hangs_gh-3464.test.lua +++ b/test/box/net.box_iproto_hangs_gh-3464.test.lua @@ -8,6 +8,6 @@ net = require('net.box') -- c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' -c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data) +c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data) c:close() test_run:grep_log('default', 'too big packet size in the header') ~= nil diff --git a/test/box/net.box_long-poll_input_gh-3400.result b/test/box/net.box_long-poll_input_gh-3400.result index a16110ee6..a98eea655 100644 --- a/test/box/net.box_long-poll_input_gh-3400.result +++ b/test/box/net.box_long-poll_input_gh-3400.result @@ -24,10 +24,10 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, false, \ - net._method.call_17, nil, nil, nil, 'long', {}) \ -c._transport.perform_request(nil, nil, false, net._method.inject, \ - nil, nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, \ + net._method.call_17, nil, nil, nil, nil, 'long', {}) \ +c._transport.perform_request(nil, nil, false, net._method.inject, \ + nil, nil, nil, nil, '\x80') --- ... while f:status() ~= 'dead' do fiber.sleep(0.01) end diff --git a/test/box/net.box_long-poll_input_gh-3400.test.lua b/test/box/net.box_long-poll_input_gh-3400.test.lua index 891b59224..a6f302ee0 100644 --- a/test/box/net.box_long-poll_input_gh-3400.test.lua +++ b/test/box/net.box_long-poll_input_gh-3400.test.lua @@ -14,9 +14,9 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, false, \ - net._method.call_17, nil, nil, nil, 'long', {}) \ -c._transport.perform_request(nil, nil, false, net._method.inject, \ - nil, nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, \ + net._method.call_17, nil, nil, nil, nil, 'long', {}) \ +c._transport.perform_request(nil, nil, false, net._method.inject, \ + nil, nil, nil, nil, '\x80') while f:status() ~= 'dead' do fiber.sleep(0.01) end c:close() diff --git a/test/box/stream.lua b/test/box/stream.lua new file mode 100644 index 000000000..db6a29a8a --- /dev/null +++ b/test/box/stream.lua @@ -0,0 +1,13 @@ +#!/usr/bin/env tarantool + +require('console').listen(os.getenv('ADMIN')) + +local memtx_use_mvcc_engine = (arg[2] and arg[2] == 'true' and true or false) + +box.cfg({ + listen = os.getenv('LISTEN'), + iproto_threads = tonumber(arg[1]), + memtx_use_mvcc_engine = memtx_use_mvcc_engine +}) + +box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe', nil, {if_not_exists = true}) diff --git a/test/box/stream.result b/test/box/stream.result new file mode 100644 index 000000000..03200ecf6 --- /dev/null +++ b/test/box/stream.result @@ -0,0 +1,485 @@ +-- test-run result file version 2 +-- This test checks streams iplementation in iproto (gh-5860). +net_box = require('net.box') + | --- + | ... +fiber = require('fiber') + | --- + | ... +test_run = require('test_run').new() + | --- + | ... + +test_run:cmd("create server test with script='box/stream.lua'") + | --- + | - true + | ... + +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +function get_current_connection_count() + local total_net_stat_table = + test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table) + local connection_stat_table = total_net_stat_table.CONNECTIONS + assert(connection_stat_table) + return connection_stat_table.current +end; + | --- + | ... +function wait_and_return_results(futures) + local results = {} + for name, future in pairs(futures) do + local err + results[name], err = future:wait_result() + if err then + results[name] = err + end + end + return results +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... + +-- Some simple checks for new object - stream +test_run:cmd("start server test with args='1'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... +conn_1 = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn_1:new_stream() + | --- + | ... +conn_2 = net_box.connect(server_addr) + | --- + | ... +stream_2 = conn_2:new_stream() + | --- + | ... +-- Stream is a wrapper around connection, so if you close connection +-- you close stream, and vice versa. +conn_1:close() + | --- + | ... +assert(not stream_1:ping()) + | --- + | - true + | ... +stream_2:close() + | --- + | ... +assert(not conn_2:ping()) + | --- + | - true + | ... +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... +-- The new method `new_stream`, for the stream object, returns a new +-- stream object, just as in the case of connection. +_ = stream:new_stream() + | --- + | ... +conn:close() + | --- + | ... + +-- Check that spaces in stream object updates, during reload_schema +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Create one space on server +s = box.schema.space.create('test', { engine = 'memtx' }) + | --- + | ... +_ = s:create_index('primary') + | --- + | ... +test_run:switch("default") + | --- + | - true + | ... +assert(not conn.space.test) + | --- + | - true + | ... +assert(not stream.space.test) + | --- + | - true + | ... +assert(conn.schema_version == stream._schema_version) + | --- + | - true + | ... +conn:reload_schema() + | --- + | ... +assert(conn.space.test ~= nil) + | --- + | - true + | ... +assert(conn.schema_version ~= stream._schema_version) + | --- + | - true + | ... +assert(stream.space.test ~= nil) + | --- + | - true + | ... +-- When we touch stream.space, we compare stream._schema_version +-- and conn.schema_version if they are not equal, we clear stream +-- space cache, update it's _schema_version and load space from +-- connection to stream space cache. +assert(conn.schema_version == stream._schema_version) + | --- + | - true + | ... +collectgarbage() + | --- + | - 0 + | ... +collectgarbage() + | --- + | - 0 + | ... +assert(conn.space.test ~= nil) + | --- + | - true + | ... +assert(stream.space.test ~= nil) + | --- + | - true + | ... +test_run:switch("test") + | --- + | - true + | ... +s:drop() + | --- + | ... +test_run:switch("default") + | --- + | - true + | ... +conn:reload_schema() + | --- + | ... +assert(not conn.space.test) + | --- + | - true + | ... +assert(not stream.space.test) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- All test works with iproto_thread count = 10 + +test_run:cmd("start server test with args='10'") + | --- + | - true + | ... +test_run:switch('test') + | --- + | - true + | ... +fiber = require('fiber') + | --- + | ... +s = box.schema.space.create('test', { engine = 'memtx' }) + | --- + | ... +_ = s:create_index('primary') + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +function replace_with_yeild(item) + fiber.sleep(0.1) + return s:replace({item}) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +conn_space = conn.space.test + | --- + | ... +stream = conn:new_stream() + | --- + | ... +stream_space = stream.space.test + | --- + | ... + +-- Check that all requests in stream processed consistently +futures = {} + | --- + | ... +replace_count = 3 + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +for i = 1, replace_count do + futures[string.format("replace_%d", i)] = + stream_space:replace({i}, {is_async = true}) + futures[string.format("select_%d", i)] = + stream_space:select({}, {is_async = true}) +end; + | --- + | ... +futures["replace_with_yeild_for_stream"] = + stream:call("replace_with_yeild", + { replace_count + 1 }, {is_async = true}); + | --- + | ... +futures["select_with_yeild_for_stream"] = + stream_space:select({}, {is_async = true}); + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +results = wait_and_return_results(futures) + | --- + | ... +-- [1] +assert(results["select_1"]) + | --- + | - - [1] + | ... +-- [1] [2] +assert(results["select_2"]) + | --- + | - - [1] + | - [2] + | ... +-- [1] [2] [3] +assert(results["select_3"]) + | --- + | - - [1] + | - [2] + | - [3] + | ... +-- [1] [2] [3] [4] +-- Even yeild in replace function does not affect +-- the order of requests execution in stream +assert(results["select_with_yeild_for_stream"]) + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | ... + +-- There is no request execution order for the connection +futures = {} + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +futures["replace_with_yeild_for_connection"] = + conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true}); + | --- + | ... +futures["select_with_yeild_for_connection"] = + conn_space:select({}, {is_async = true}); + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +results = wait_and_return_results(futures) + | --- + | ... +-- [1] [2] [3] [4] +-- Select will be processed earlier because of +-- yeild in `replace_with_yeild` function +assert(results["select_with_yeild_for_connection"]) + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- [1] [2] [3] [4] [5] +s:select() + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... + +-- Check that all request will be processed +-- after connection close. +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... +space = stream.space.test + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +replace_count = 20 +for i = 1, replace_count do + space:replace({i}, {is_async = true}) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +-- Give time to send +fiber.sleep(0) + | --- + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:switch("test") + | --- + | - true + | ... +-- select return tuples from [1] to [20] +-- because all messages processed after +-- connection closed +s:select{} + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | - [6] + | - [7] + | - [8] + | - [9] + | - [10] + | - [11] + | - [12] + | - [13] + | - [14] + | - [15] + | - [16] + | - [17] + | - [18] + | - [19] + | - [20] + | ... +s:drop() + | --- + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch("default") + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +test_run:cmd("cleanup server test") + | --- + | - true + | ... +test_run:cmd("delete server test") + | --- + | - true + | ... diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua new file mode 100644 index 000000000..72129a228 --- /dev/null +++ b/test/box/stream.test.lua @@ -0,0 +1,182 @@ +-- This test checks streams iplementation in iproto (gh-5860). +net_box = require('net.box') +fiber = require('fiber') +test_run = require('test_run').new() + +test_run:cmd("create server test with script='box/stream.lua'") + +test_run:cmd("setopt delimiter ';'") +function get_current_connection_count() + local total_net_stat_table = + test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table) + local connection_stat_table = total_net_stat_table.CONNECTIONS + assert(connection_stat_table) + return connection_stat_table.current +end; +function wait_and_return_results(futures) + local results = {} + for name, future in pairs(futures) do + local err + results[name], err = future:wait_result() + if err then + results[name] = err + end + end + return results +end; +test_run:cmd("setopt delimiter ''"); + +-- Some simple checks for new object - stream +test_run:cmd("start server test with args='1'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +conn_1 = net_box.connect(server_addr) +stream_1 = conn_1:new_stream() +conn_2 = net_box.connect(server_addr) +stream_2 = conn_2:new_stream() +-- Stream is a wrapper around connection, so if you close connection +-- you close stream, and vice versa. +conn_1:close() +assert(not stream_1:ping()) +stream_2:close() +assert(not conn_2:ping()) +conn = net_box.connect(server_addr) +stream = conn:new_stream() +-- The new method `new_stream`, for the stream object, returns a new +-- stream object, just as in the case of connection. +_ = stream:new_stream() +conn:close() + +-- Check that spaces in stream object updates, during reload_schema +conn = net_box.connect(server_addr) +stream = conn:new_stream() +test_run:switch("test") +-- Create one space on server +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:switch("default") +assert(not conn.space.test) +assert(not stream.space.test) +assert(conn.schema_version == stream._schema_version) +conn:reload_schema() +assert(conn.space.test ~= nil) +assert(conn.schema_version ~= stream._schema_version) +assert(stream.space.test ~= nil) +-- When we touch stream.space, we compare stream._schema_version +-- and conn.schema_version if they are not equal, we clear stream +-- space cache, update it's _schema_version and load space from +-- connection to stream space cache. +assert(conn.schema_version == stream._schema_version) +collectgarbage() +collectgarbage() +assert(conn.space.test ~= nil) +assert(stream.space.test ~= nil) +test_run:switch("test") +s:drop() +test_run:switch("default") +conn:reload_schema() +assert(not conn.space.test) +assert(not stream.space.test) +test_run:cmd("stop server test") + +-- All test works with iproto_thread count = 10 + +test_run:cmd("start server test with args='10'") +test_run:switch('test') +fiber = require('fiber') +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:cmd("setopt delimiter ';'") +function replace_with_yeild(item) + fiber.sleep(0.1) + return s:replace({item}) +end; +test_run:cmd("setopt delimiter ''"); +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +conn_space = conn.space.test +stream = conn:new_stream() +stream_space = stream.space.test + +-- Check that all requests in stream processed consistently +futures = {} +replace_count = 3 +test_run:cmd("setopt delimiter ';'") +for i = 1, replace_count do + futures[string.format("replace_%d", i)] = + stream_space:replace({i}, {is_async = true}) + futures[string.format("select_%d", i)] = + stream_space:select({}, {is_async = true}) +end; +futures["replace_with_yeild_for_stream"] = + stream:call("replace_with_yeild", + { replace_count + 1 }, {is_async = true}); +futures["select_with_yeild_for_stream"] = + stream_space:select({}, {is_async = true}); +test_run:cmd("setopt delimiter ''"); +results = wait_and_return_results(futures) +-- [1] +assert(results["select_1"]) +-- [1] [2] +assert(results["select_2"]) +-- [1] [2] [3] +assert(results["select_3"]) +-- [1] [2] [3] [4] +-- Even yeild in replace function does not affect +-- the order of requests execution in stream +assert(results["select_with_yeild_for_stream"]) + +-- There is no request execution order for the connection +futures = {} +test_run:cmd("setopt delimiter ';'") +futures["replace_with_yeild_for_connection"] = + conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true}); +futures["select_with_yeild_for_connection"] = + conn_space:select({}, {is_async = true}); +test_run:cmd("setopt delimiter ''"); +results = wait_and_return_results(futures) +-- [1] [2] [3] [4] +-- Select will be processed earlier because of +-- yeild in `replace_with_yeild` function +assert(results["select_with_yeild_for_connection"]) +test_run:switch("test") +-- [1] [2] [3] [4] [5] +s:select() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Check that all request will be processed +-- after connection close. +conn = net_box.connect(server_addr) +stream = conn:new_stream() +space = stream.space.test +test_run:cmd("setopt delimiter ';'") +replace_count = 20 +for i = 1, replace_count do + space:replace({i}, {is_async = true}) +end; +test_run:cmd("setopt delimiter ''"); +-- Give time to send +fiber.sleep(0) +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:switch("test") +-- select return tuples from [1] to [20] +-- because all messages processed after +-- connection closed +s:select{} +s:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +test_run:cmd("stop server test") + +test_run:cmd("cleanup server test") +test_run:cmd("delete server test") diff --git a/test/box/suite.ini b/test/box/suite.ini index b5d869fb3..94cf7811f 100644 --- a/test/box/suite.ini +++ b/test/box/suite.ini @@ -5,7 +5,7 @@ script = box.lua disabled = rtree_errinj.test.lua tuple_bench.test.lua long_run = huge_field_map_long.test.lua config = engine.cfg -release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua +release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua stream.test.lua lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua use_unix_sockets = True use_unix_sockets_iproto = True -- 2.20.1