From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 8F53771211; Thu, 5 Aug 2021 21:20:20 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 8F53771211 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628187620; bh=Y5lLSSeg3lEU918GPxRq5xIAZtxsD0oZs+dgG7fw9XQ=; h=To:Cc:Date:In-Reply-To:References:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=eWi6lxEYhniZEmi8XrDpCMMHFbBAqCboEymojfrR4++TYex22SGJ/RvTOIspfwg5M qTkX2+w6kL0+K/z2XXKH6q481juTA3qLFMzyGtHQPtnhfz05JKaePwZBqGWvg7D6bz hvy7XAGuT0AXs/IxrM7Y17g8GOMdjAIWxhtusE8c= Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C312171211 for ; Thu, 5 Aug 2021 21:17:54 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C312171211 Received: by smtp53.i.mail.ru with esmtpa (envelope-from ) id 1mBhwH-0005CS-Ek; Thu, 05 Aug 2021 21:17:54 +0300 To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org Cc: tarantool-patches@dev.tarantool.org, mechanik20051988 Date: Thu, 5 Aug 2021 21:17:43 +0300 Message-Id: <6ec4b53e9bb3ea0d1d5873d47646ece3c93df6b4.1628184138.git.mechanik20.05.1988@gmail.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: References: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-7564579A: B8F34718100C35BD X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD9ECFD080E047A606F6525B29142351271182A05F538085040F2A56503A91DCB4923C790C6CBEC5EB70B6CA9EEC626628E100510F1631F3983 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7956F10FFCC7409BAEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006371B0187663A04449C8638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D889AF5C5212E9B1266B266A1FCDD1B748117882F4460429724CE54428C33FAD305F5C1EE8F4F765FC8883BAB8B32E402CA471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F4460429728776938767073520B28585415E75ADA9C26CFBAC0749D213D2E47CDBA5A96583BA9C0B312567BB2376E601842F6C81A19E625A9149C048EE41BF15D38FB6CB3A1B0CC92B5A49C88ED8FC6C240DEA7642DBF02ECDB25306B2B78CF848AE20165D0A6AB1C7CE11FEE37527C25B2C93CE7603F1AB874ED89028C4224003CC836476EA7A3FFF5B025636E2021AF6380DFAD1A18204E546F3947CB11811A4A51E3B096D1867E19FE1407959CC434672EE6371089D37D7C0E48F6C8AA50765F790063780E7E366B0FF8F58EFF80C71ABB335746BA297DBC24807EABDAD6C7F3747799A X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8186998911F362727C414F749A5E30D975C30CE973C7F71088D0D0ACDE1AFC56BC32E0CD862A10B72989C2B6934AE262D3EE7EAB7254005DCED7532B743992DF240BDC6A1CF3F042BAD6DF99611D93F60EF3033054805BDE987699F904B3F4130E343918A1A30D5E7FCCB5012B2E24CD356 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34A9A0A0BF1A2CAC62937D9F16A807686BC71A3CADCF28B88F1FFD0A06F4EE6208D4342B749B9B5EE71D7E09C32AA3244CA5D48C6B61238DC7C864B6B5964E95A8259227199D06760A927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojh4v93/7HD3UhAR/rqrs2uA== X-Mailru-Sender: 583F1D7ACE8F49BD29FC049B2A5BF963272C3B768E89E9F113B0564DF997F68D9D956CDB35114E57B79567116EAC6FCF4E830D9205DBEA545646F0D3C63A617F27ACC94E9A535D22112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH 5/7] net.box: add stream support to net.box X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: mechanik20051988 via Tarantool-patches Reply-To: mechanik20051988 Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" From: mechanik20051988 Add stream support to `net.box`. In "net.box", stream is an object over connection that has the same methods, but all requests from it sends with non-zero stream ID. Since there can be a lot of streams, we do not copy the spaces from the connection to the stream immediately when creating a stream, but do it only when we first access space. Also, when updating the schema, we update the spaces only for those streams that have had at least one access to the space. Part of #5860 @TarantoolBot document Title: stream support was added to net.box In "net.box", stream is an object over connection that has the same methods, but all requests from it sends with non-zero stream ID. Stream ID is generated on the client side in two ways: automatically or manually. User can choose any of two methods, but can not mix them. Simple example of stream creation using net.box: ```lua -- automatically generated stream_id stream = conn:stream() -- manually chosen stream_id stream = conn:stream(1) ``` --- src/box/lua/net_box.c | 101 ++-- src/box/lua/net_box.lua | 191 ++++-- test/box/access.result | 6 +- test/box/access.test.lua | 6 +- ...net.box_console_connections_gh-2677.result | 2 +- ...t.box_console_connections_gh-2677.test.lua | 2 +- .../net.box_incorrect_iterator_gh-841.result | 4 +- ...net.box_incorrect_iterator_gh-841.test.lua | 4 +- test/box/net.box_iproto_hangs_gh-3464.result | 2 +- .../box/net.box_iproto_hangs_gh-3464.test.lua | 2 +- .../net.box_long-poll_input_gh-3400.result | 8 +- .../net.box_long-poll_input_gh-3400.test.lua | 8 +- test/box/stream.lua | 13 + test/box/stream.result | 553 ++++++++++++++++++ test/box/stream.test.lua | 207 +++++++ test/box/suite.ini | 2 +- 16 files changed, 1011 insertions(+), 100 deletions(-) create mode 100644 test/box/stream.lua create mode 100644 test/box/stream.result create mode 100644 test/box/stream.test.lua diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index 82efc483d..ec850cd9f 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -76,7 +76,7 @@ enum netbox_method { static inline size_t netbox_prepare_request(struct mpstream *stream, uint64_t sync, - enum iproto_type type) + enum iproto_type type, uint64_t stream_id) { /* Remember initial size of ibuf (see netbox_encode_request()) */ struct ibuf *ibuf = stream->ctx; @@ -88,7 +88,7 @@ netbox_prepare_request(struct mpstream *stream, uint64_t sync, mpstream_advance(stream, fixheader_size); /* encode header */ - mpstream_encode_map(stream, 2); + mpstream_encode_map(stream, stream_id != 0 ? 3 : 2); mpstream_encode_uint(stream, IPROTO_SYNC); mpstream_encode_uint(stream, sync); @@ -96,6 +96,10 @@ netbox_prepare_request(struct mpstream *stream, uint64_t sync, mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE); mpstream_encode_uint(stream, type); + if (stream_id != 0) { + mpstream_encode_uint(stream, IPROTO_STREAM_ID); + mpstream_encode_uint(stream, stream_id); + } /* Caller should remember how many bytes was used in ibuf */ return used; } @@ -128,20 +132,21 @@ netbox_encode_request(struct mpstream *stream, size_t initial_size) static void netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { (void)L; (void)idx; - size_t svp = netbox_prepare_request(stream, sync, IPROTO_PING); + size_t svp = netbox_prepare_request(stream, sync, + IPROTO_PING, stream_id); netbox_encode_request(stream, svp); } static int netbox_encode_auth(lua_State *L) { - if (lua_gettop(L) < 5) { + if (lua_gettop(L) < 6) { return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, " - "user, password, greeting)"); + "stream_id, user, password, greeting)"); } struct ibuf *ibuf = (struct ibuf *)lua_topointer(L, 1); uint64_t sync = luaL_touint64(L, 2); @@ -149,14 +154,14 @@ netbox_encode_auth(lua_State *L) struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, luamp_error, L); - size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH); + size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH, 0); size_t user_len; - const char *user = lua_tolstring(L, 3, &user_len); + const char *user = lua_tolstring(L, 4, &user_len); size_t password_len; - const char *password = lua_tolstring(L, 4, &password_len); + const char *password = lua_tolstring(L, 5, &password_len); size_t salt_len; - const char *salt = lua_tolstring(L, 5, &salt_len); + const char *salt = lua_tolstring(L, 6, &salt_len); if (salt_len < SCRAMBLE_SIZE) return luaL_error(L, "Invalid salt"); @@ -179,11 +184,10 @@ netbox_encode_auth(lua_State *L) static void netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync, enum iproto_type type) + uint64_t sync, enum iproto_type type, uint64_t stream_id) { /* Lua stack at idx: function_name, args */ - size_t svp = netbox_prepare_request(stream, sync, type); - + size_t svp = netbox_prepare_request(stream, sync, type, stream_id); mpstream_encode_map(stream, 2); /* encode proc name */ @@ -201,24 +205,26 @@ netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_call_16(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL_16); + netbox_encode_call_impl(L, idx, stream, sync, + IPROTO_CALL_16, stream_id); } static void netbox_encode_call(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL); + netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL, stream_id); } static void netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: expr, args */ - size_t svp = netbox_prepare_request(stream, sync, IPROTO_EVAL); + size_t svp = netbox_prepare_request(stream, sync, + IPROTO_EVAL, stream_id); mpstream_encode_map(stream, 2); @@ -237,10 +243,11 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_select(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, index_id, iterator, offset, limit, key */ - size_t svp = netbox_prepare_request(stream, sync, IPROTO_SELECT); + size_t svp = netbox_prepare_request(stream, sync, + IPROTO_SELECT, stream_id); mpstream_encode_map(stream, 6); @@ -279,10 +286,11 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync, enum iproto_type type) + uint64_t sync, enum iproto_type type, + uint64_t stream_id) { /* Lua stack at idx: space_id, tuple */ - size_t svp = netbox_prepare_request(stream, sync, type); + size_t svp = netbox_prepare_request(stream, sync, type, stream_id); mpstream_encode_map(stream, 2); @@ -300,24 +308,27 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_insert(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_INSERT); + netbox_encode_insert_or_replace(L, idx, stream, sync, + IPROTO_INSERT, stream_id); } static void netbox_encode_replace(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_REPLACE); + netbox_encode_insert_or_replace(L, idx, stream, sync, + IPROTO_REPLACE, stream_id); } static void netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, index_id, key */ - size_t svp = netbox_prepare_request(stream, sync, IPROTO_DELETE); + size_t svp = netbox_prepare_request(stream, sync, + IPROTO_DELETE, stream_id); mpstream_encode_map(stream, 3); @@ -340,10 +351,11 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_update(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, index_id, key, ops */ - size_t svp = netbox_prepare_request(stream, sync, IPROTO_UPDATE); + size_t svp = netbox_prepare_request(stream, sync, + IPROTO_UPDATE, stream_id); mpstream_encode_map(stream, 5); @@ -374,10 +386,11 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, tuple, ops */ - size_t svp = netbox_prepare_request(stream, sync, IPROTO_UPSERT); + size_t svp = netbox_prepare_request(stream, sync, + IPROTO_UPSERT, stream_id); mpstream_encode_map(stream, 4); @@ -547,10 +560,11 @@ handle_error: static void netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: query, parameters, options */ - size_t svp = netbox_prepare_request(stream, sync, IPROTO_EXECUTE); + size_t svp = netbox_prepare_request(stream, sync, + IPROTO_EXECUTE, stream_id); mpstream_encode_map(stream, 3); @@ -576,10 +590,11 @@ netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: query */ - size_t svp = netbox_prepare_request(stream, sync, IPROTO_PREPARE); + size_t svp = netbox_prepare_request(stream, sync, + IPROTO_PREPARE, stream_id); mpstream_encode_map(stream, 1); @@ -599,18 +614,19 @@ netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: query, parameters, options */ - netbox_encode_prepare(L, idx, stream, sync); + netbox_encode_prepare(L, idx, stream, sync, stream_id); } static void netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: bytes */ (void)sync; + (void)stream_id; size_t len; const char *data = lua_tolstring(L, idx, &len); mpstream_memcpy(stream, data, len); @@ -632,7 +648,7 @@ netbox_encode_method(struct lua_State *L) { typedef void (*method_encoder_f)(struct lua_State *L, int idx, struct mpstream *stream, - uint64_t sync); + uint64_t sync, uint64_t stream_id); static method_encoder_f method_encoder[] = { [NETBOX_PING] = netbox_encode_ping, [NETBOX_CALL_16] = netbox_encode_call_16, @@ -657,10 +673,11 @@ netbox_encode_method(struct lua_State *L) assert(method < netbox_method_MAX); struct ibuf *ibuf = (struct ibuf *)lua_topointer(L, 2); uint64_t sync = luaL_touint64(L, 3); + uint64_t stream_id = luaL_touint64(L, 4); struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, luamp_error, L); - method_encoder[method](L, 4, &stream, sync); + method_encoder[method](L, 5, &stream, sync, stream_id); return 0; } diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 9af6028eb..bf6a89e15 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -483,7 +483,7 @@ local function create_transport(host, port, user, password, callback, -- @retval not nil Future object. -- local function perform_async_request(buffer, skip_header, method, on_push, - on_push_ctx, format, ...) + on_push_ctx, format, stream_id, ...) if state ~= 'active' and state ~= 'fetch_schema' then local code = last_errno or E_NO_CONNECTION local msg = last_error or @@ -497,7 +497,7 @@ local function create_transport(host, port, user, password, callback, worker_fiber:wakeup() end local id = next_request_id - encode_method(method, send_buf, id, ...) + encode_method(method, send_buf, id, stream_id, ...) next_request_id = next_id(id) -- Request in most cases has maximum 10 members: -- method, buffer, skip_header, id, cond, errno, response, @@ -521,10 +521,10 @@ local function create_transport(host, port, user, password, callback, -- @retval not nil Response object. -- local function perform_request(timeout, buffer, skip_header, method, - on_push, on_push_ctx, format, ...) + on_push, on_push_ctx, format, stream_id, ...) local request, err = perform_async_request(buffer, skip_header, method, on_push, - on_push_ctx, format, ...) + on_push_ctx, format, stream_id, ...) if not request then return nil, err end @@ -710,7 +710,7 @@ local function create_transport(host, port, user, password, callback, log.warn("Netbox text protocol support is deprecated since 1.10, ".. "please use require('console').connect() instead") local setup_delimiter = 'require("console").delimiter("$EOF$")\n' - encode_method(M_INJECT, send_buf, nil, setup_delimiter) + encode_method(M_INJECT, send_buf, nil, nil, setup_delimiter) local err, response = send_and_recv_console() if err then return error_sm(err, response) @@ -744,7 +744,7 @@ local function create_transport(host, port, user, password, callback, set_state('fetch_schema') return iproto_schema_sm() end - encode_auth(send_buf, new_request_id(), user, password, salt) + encode_auth(send_buf, new_request_id(), nil, user, password, salt) local err, hdr, body_rpos = send_and_recv_iproto() if err then return error_sm(err, hdr) @@ -770,15 +770,15 @@ local function create_transport(host, port, user, password, callback, local select3_id local response = {} -- fetch everything from space _vspace, 2 = ITER_ALL - encode_method(M_SELECT, send_buf, select1_id, VSPACE_ID, 0, 2, 0, + encode_method(M_SELECT, send_buf, select1_id, nil, VSPACE_ID, 0, 2, 0, 0xFFFFFFFF, nil) -- fetch everything from space _vindex, 2 = ITER_ALL - encode_method(M_SELECT, send_buf, select2_id, VINDEX_ID, 0, 2, 0, + encode_method(M_SELECT, send_buf, select2_id, nil, VINDEX_ID, 0, 2, 0, 0xFFFFFFFF, nil) -- fetch everything from space _vcollation, 2 = ITER_ALL if peer_has_vcollation then select3_id = new_request_id() - encode_method(M_SELECT, send_buf, select3_id, VCOLLATION_ID, + encode_method(M_SELECT, send_buf, select3_id, nil, VCOLLATION_ID, 0, 2, 0, 0xFFFFFFFF, nil) end @@ -930,6 +930,8 @@ local function remote_serialize(self) } end +local stream_methods = {} + local remote_methods = {} local remote_mt = { __index = remote_methods, __serialize = remote_serialize, @@ -942,6 +944,90 @@ local console_mt = { __metatable = false } +local stream_index_mt = { + __index = function(self, key) + return self._src[key] + end +} + +-- Create stream space index, which is same as connection space +-- index, but have non zero stream ID. +local function stream_wrap_index(stream_id, src) + return setmetatable({ + _stream_id = stream_id, + _src = src, + }, stream_index_mt) +end + +-- Metatable for stream space indexes. When stream space being +-- created there are no indexes in it. When accessing the space +-- index, we look for corresponding space index in corresponding +-- connection space. If it is found we create same index for the +-- stream space but with corresponding stream ID. We do not need +-- to compare stream _schema_version and connection schema_version, +-- because all access to index is carried out through it's space. +-- So we update schema_version when we access space. +local stream_indexes_mt = { + __index = function(self, key) + local _space = self._space + local src = _space._src.index[key] + if not src then + return nil + end + local res = stream_wrap_index(_space._stream_id, src) + self[key] = res + return res + end +} + +local stream_space_mt = { + __index = function(self, key) + return self._src[key] + end +} + +-- Create stream space, which is same as connection space, +-- but have non zero stream ID. +local function stream_wrap_space(stream, src) + local res = setmetatable({ + _stream_id = stream._stream_id, + _src = src, + index = setmetatable({ + _space = nil, + }, stream_indexes_mt) + }, stream_space_mt) + res.index._space = res + return res +end + +-- Metatable for stream spaces. When stream being created there +-- are no spaces in it. When user try to access some space in +-- stream, we first of all compare _schema_version of stream with +-- schema_version from connection and if they are not equal, we +-- clear stream space cache and update it's schema_version. Then +-- we look for corresponding space in the connection. If it is +-- found we create same space for the stream but with corresponding +-- stream ID. +local stream_spaces_mt = { + __index = function(self, key) + local stream = self._stream + if stream._schema_version ~= stream._conn.schema_version then + stream._schema_version = stream._conn.schema_version + self._space = {} + end + if self._space[key] then + return self._space[key] + end + local src = stream._conn.space[key] + if not src then + return nil + end + local res = stream_wrap_space(stream, src) + self._space[key] = res + return res + end +} + local space_metatable, index_metatable local function new_sm(host, port, opts, connection, greeting) @@ -1021,6 +1107,9 @@ local function new_sm(host, port, opts, connection, greeting) if opts.wait_connected ~= false then remote._transport.wait_state('active', tonumber(opts.wait_connected)) end + -- Last stream ID used for this connection + remote._last_stream_id = 0 + remote._streams = setmetatable({}, {__mode = 'v'}) return remote end @@ -1078,6 +1167,29 @@ local function check_eval_args(args) end end +function stream_methods:new_stream() + check_remote_arg(self, 'stream') + box.error(E_PROC_LUA, "Unsupported for stream"); +end + +function remote_methods:new_stream() + check_remote_arg(self, 'stream') + self._last_stream_id = self._last_stream_id + 1 + local stream = setmetatable({ + new_stream = stream_methods.new_stream, + _stream_id = self._last_stream_id, + space = setmetatable({ + _space = {}, + _stream = nil, + }, stream_spaces_mt), + _conn = self, + _schema_version = self.schema_version, + }, { __index = self }) + stream.space._stream = stream + self._streams[self._last_stream_id] = stream + return stream +end + function remote_methods:close() check_remote_arg(self, 'close') self._transport.stop() @@ -1108,7 +1220,7 @@ function remote_methods:wait_connected(timeout) return self._transport.wait_state('active', timeout) end -function remote_methods:_request(method, opts, format, ...) +function remote_methods:_request(method, opts, format, stream_id, ...) local transport = self._transport local on_push, on_push_ctx, buffer, skip_header, deadline -- Extract options, set defaults, check if the request is @@ -1123,7 +1235,7 @@ function remote_methods:_request(method, opts, format, ...) local res, err = transport.perform_async_request(buffer, skip_header, method, table.insert, {}, format, - ...) + stream_id, ...) if err then box.error(err) end @@ -1145,7 +1257,7 @@ function remote_methods:_request(method, opts, format, ...) end local res, err = transport.perform_request(timeout, buffer, skip_header, method, on_push, on_push_ctx, - format, ...) + format, stream_id, ...) if err then box.error(err) end @@ -1161,7 +1273,7 @@ end function remote_methods:ping(opts) check_remote_arg(self, 'ping') - return (pcall(self._request, self, M_PING, opts)) + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id)) end function remote_methods:reload_schema() @@ -1172,14 +1284,16 @@ end -- @deprecated since 1.7.4 function remote_methods:call_16(func_name, ...) check_remote_arg(self, 'call') - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...})) + return (self:_request(M_CALL_16, nil, nil, self._stream_id, + tostring(func_name), {...})) end function remote_methods:call(func_name, args, opts) check_remote_arg(self, 'call') check_call_args(args) args = args or {} - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args) + local res = self:_request(M_CALL_17, opts, nil, self._stream_id, + tostring(func_name), args) if type(res) ~= 'table' or opts and opts.is_async then return res end @@ -1189,14 +1303,15 @@ end -- @deprecated since 1.7.4 function remote_methods:eval_16(code, ...) check_remote_arg(self, 'eval') - return unpack((self:_request(M_EVAL, nil, nil, code, {...}))) + return unpack((self:_request(M_EVAL, nil, nil, self._stream_id, + code, {...}))) end function remote_methods:eval(code, args, opts) check_remote_arg(self, 'eval') check_eval_args(args) args = args or {} - local res = self:_request(M_EVAL, opts, nil, code, args) + local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args) if type(res) ~= 'table' or opts and opts.is_async then return res end @@ -1208,8 +1323,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts) if sql_opts ~= nil then box.error(box.error.UNSUPPORTED, "execute", "options") end - return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {}, - sql_opts or {}) + return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id, + query, parameters or {}, sql_opts or {}) end function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args @@ -1220,7 +1335,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua if sql_opts ~= nil then box.error(box.error.UNSUPPORTED, "prepare", "options") end - return self:_request(M_PREPARE, netbox_opts, nil, query) + return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query) end function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) @@ -1231,8 +1346,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) if sql_opts ~= nil then box.error(box.error.UNSUPPORTED, "unprepare", "options") end - return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {}, - sql_opts or {}) + return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id, + query, parameters or {}, sql_opts or {}) end function remote_methods:wait_state(state, timeout) @@ -1370,11 +1485,11 @@ function console_methods:eval(line, timeout) end if self.protocol == 'Binary' then local loader = 'return require("console").eval(...)' - res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader, + res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader, {line}) else assert(self.protocol == 'Lua console') - res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, + res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil, line..'$EOF$\n') end if err then @@ -1394,14 +1509,14 @@ space_metatable = function(remote) function methods:insert(tuple, opts) check_space_arg(self, 'insert') - return remote:_request(M_INSERT, opts, self._format_cdata, self.id, - tuple) + return remote:_request(M_INSERT, opts, self._format_cdata, + self._stream_id, self.id, tuple) end function methods:replace(tuple, opts) check_space_arg(self, 'replace') - return remote:_request(M_REPLACE, opts, self._format_cdata, self.id, - tuple) + return remote:_request(M_REPLACE, opts, self._format_cdata, + self._stream_id, self.id, tuple) end function methods:select(key, opts) @@ -1421,7 +1536,8 @@ space_metatable = function(remote) function methods:upsert(key, oplist, opts) check_space_arg(self, 'upsert') - return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id, + return nothing_or_data(remote:_request(M_UPSERT, opts, nil, + self._stream_id, self.id, key, oplist)) end @@ -1452,8 +1568,8 @@ index_metatable = function(remote) local offset = tonumber(opts and opts.offset) or 0 local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF return (remote:_request(M_SELECT, opts, self.space._format_cdata, - self.space.id, self.id, iterator, offset, - limit, key)) + self._stream_id, self.space.id, self.id, + iterator, offset, limit, key)) end function methods:get(key, opts) @@ -1463,6 +1579,7 @@ index_metatable = function(remote) end return nothing_or_data(remote:_request(M_GET, opts, self.space._format_cdata, + self._stream_id, self.space.id, self.id, box.index.EQ, 0, 2, key)) end @@ -1474,6 +1591,7 @@ index_metatable = function(remote) end return nothing_or_data(remote:_request(M_MIN, opts, self.space._format_cdata, + self._stream_id, self.space.id, self.id, box.index.GE, 0, 1, key)) end @@ -1485,6 +1603,7 @@ index_metatable = function(remote) end return nothing_or_data(remote:_request(M_MAX, opts, self.space._format_cdata, + self._stream_id, self.space.id, self.id, box.index.LE, 0, 1, key)) end @@ -1496,22 +1615,24 @@ index_metatable = function(remote) end local code = string.format('box.space.%s.index.%s:count', self.space.name, self.name) - return remote:_request(M_COUNT, opts, nil, code, { key, opts }) + return remote:_request(M_COUNT, opts, nil, self._stream_id, + code, { key, opts }) end function methods:delete(key, opts) check_index_arg(self, 'delete') return nothing_or_data(remote:_request(M_DELETE, opts, self.space._format_cdata, - self.space.id, self.id, key)) + self._stream_id, self.space.id, + self.id, key)) end function methods:update(key, oplist, opts) check_index_arg(self, 'update') return nothing_or_data(remote:_request(M_UPDATE, opts, self.space._format_cdata, - self.space.id, self.id, key, - oplist)) + self._stream_id, self.space.id, + self.id, key, oplist)) end return { __index = methods, __metatable = false } diff --git a/test/box/access.result b/test/box/access.result index 712cd68f8..6434da907 100644 --- a/test/box/access.result +++ b/test/box/access.result @@ -908,15 +908,15 @@ LISTEN = require('uri').parse(box.cfg.listen) c = net.connect(LISTEN.host, LISTEN.service) --- ... -c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) --- - error: Space '1' does not exist ... -c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) --- - error: Space '65537' does not exist ... -c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) --- - error: Space '4294967295' does not exist ... diff --git a/test/box/access.test.lua b/test/box/access.test.lua index 6060475d1..6abdb780d 100644 --- a/test/box/access.test.lua +++ b/test/box/access.test.lua @@ -351,9 +351,9 @@ box.schema.func.drop(name) -- very large space id, no crash occurs. LISTEN = require('uri').parse(box.cfg.listen) c = net.connect(LISTEN.host, LISTEN.service) -c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) -c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) -c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) c:close() session = box.session diff --git a/test/box/net.box_console_connections_gh-2677.result b/test/box/net.box_console_connections_gh-2677.result index f45aa0b56..7cea0a1da 100644 --- a/test/box/net.box_console_connections_gh-2677.result +++ b/test/box/net.box_console_connections_gh-2677.result @@ -74,7 +74,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80') --- ... while not c:is_connected() do fiber.sleep(0.01) end diff --git a/test/box/net.box_console_connections_gh-2677.test.lua b/test/box/net.box_console_connections_gh-2677.test.lua index 40d099e70..6c4e6ea4f 100644 --- a/test/box/net.box_console_connections_gh-2677.test.lua +++ b/test/box/net.box_console_connections_gh-2677.test.lua @@ -30,7 +30,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80') while not c:is_connected() do fiber.sleep(0.01) end c:ping() diff --git a/test/box/net.box_incorrect_iterator_gh-841.result b/test/box/net.box_incorrect_iterator_gh-841.result index fbd2a7700..cd2a86787 100644 --- a/test/box/net.box_incorrect_iterator_gh-841.result +++ b/test/box/net.box_incorrect_iterator_gh-841.result @@ -16,13 +16,13 @@ test_run:cmd("setopt delimiter ';'") - true ... function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) - local ret = cn:_request(remote._method.select, opts, nil, space_id, + local ret = cn:_request(remote._method.select, opts, nil, nil, space_id, index_id, iterator, offset, limit, key) return ret end function x_fatal(cn) cn._transport.perform_request(nil, nil, false, remote._method.inject, - nil, nil, nil, '\x80') + nil, nil, nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); --- diff --git a/test/box/net.box_incorrect_iterator_gh-841.test.lua b/test/box/net.box_incorrect_iterator_gh-841.test.lua index 1d24f9f56..9c42175ef 100644 --- a/test/box/net.box_incorrect_iterator_gh-841.test.lua +++ b/test/box/net.box_incorrect_iterator_gh-841.test.lua @@ -5,13 +5,13 @@ test_run:cmd("push filter ".."'\\.lua.*:[0-9]+: ' to '.lua...\"]:: '") test_run:cmd("setopt delimiter ';'") function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) - local ret = cn:_request(remote._method.select, opts, nil, space_id, + local ret = cn:_request(remote._method.select, opts, nil, nil, space_id, index_id, iterator, offset, limit, key) return ret end function x_fatal(cn) cn._transport.perform_request(nil, nil, false, remote._method.inject, - nil, nil, nil, '\x80') + nil, nil, nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); diff --git a/test/box/net.box_iproto_hangs_gh-3464.result b/test/box/net.box_iproto_hangs_gh-3464.result index 3b5458c9a..cbf8181b3 100644 --- a/test/box/net.box_iproto_hangs_gh-3464.result +++ b/test/box/net.box_iproto_hangs_gh-3464.result @@ -17,7 +17,7 @@ c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' --- ... -c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data) +c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data) --- - null - Peer closed diff --git a/test/box/net.box_iproto_hangs_gh-3464.test.lua b/test/box/net.box_iproto_hangs_gh-3464.test.lua index a7c41ae76..51a9ddece 100644 --- a/test/box/net.box_iproto_hangs_gh-3464.test.lua +++ b/test/box/net.box_iproto_hangs_gh-3464.test.lua @@ -8,6 +8,6 @@ net = require('net.box') -- c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' -c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data) +c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data) c:close() test_run:grep_log('default', 'too big packet size in the header') ~= nil diff --git a/test/box/net.box_long-poll_input_gh-3400.result b/test/box/net.box_long-poll_input_gh-3400.result index a16110ee6..a98eea655 100644 --- a/test/box/net.box_long-poll_input_gh-3400.result +++ b/test/box/net.box_long-poll_input_gh-3400.result @@ -24,10 +24,10 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, false, \ - net._method.call_17, nil, nil, nil, 'long', {}) \ -c._transport.perform_request(nil, nil, false, net._method.inject, \ - nil, nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, \ + net._method.call_17, nil, nil, nil, nil, 'long', {}) \ +c._transport.perform_request(nil, nil, false, net._method.inject, \ + nil, nil, nil, nil, '\x80') --- ... while f:status() ~= 'dead' do fiber.sleep(0.01) end diff --git a/test/box/net.box_long-poll_input_gh-3400.test.lua b/test/box/net.box_long-poll_input_gh-3400.test.lua index 891b59224..a6f302ee0 100644 --- a/test/box/net.box_long-poll_input_gh-3400.test.lua +++ b/test/box/net.box_long-poll_input_gh-3400.test.lua @@ -14,9 +14,9 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, false, \ - net._method.call_17, nil, nil, nil, 'long', {}) \ -c._transport.perform_request(nil, nil, false, net._method.inject, \ - nil, nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, \ + net._method.call_17, nil, nil, nil, nil, 'long', {}) \ +c._transport.perform_request(nil, nil, false, net._method.inject, \ + nil, nil, nil, nil, '\x80') while f:status() ~= 'dead' do fiber.sleep(0.01) end c:close() diff --git a/test/box/stream.lua b/test/box/stream.lua new file mode 100644 index 000000000..db6a29a8a --- /dev/null +++ b/test/box/stream.lua @@ -0,0 +1,13 @@ +#!/usr/bin/env tarantool + +require('console').listen(os.getenv('ADMIN')) + +local memtx_use_mvcc_engine = (arg[2] and arg[2] == 'true' and true or false) + +box.cfg({ + listen = os.getenv('LISTEN'), + iproto_threads = tonumber(arg[1]), + memtx_use_mvcc_engine = memtx_use_mvcc_engine +}) + +box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe', nil, {if_not_exists = true}) diff --git a/test/box/stream.result b/test/box/stream.result new file mode 100644 index 000000000..bfcf6c6be --- /dev/null +++ b/test/box/stream.result @@ -0,0 +1,553 @@ +-- test-run result file version 2 +-- This test checks streams iplementation in iproto (gh-5860). +net_box = require('net.box') + | --- + | ... +fiber = require('fiber') + | --- + | ... +test_run = require('test_run').new() + | --- + | ... + +test_run:cmd("create server test with script='box/stream.lua'") + | --- + | - true + | ... + +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +function get_current_connection_count() + local total_net_stat_table = + test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table) + local connection_stat_table = total_net_stat_table.CONNECTIONS + assert(connection_stat_table) + return connection_stat_table.current +end; + | --- + | ... +function wait_and_return_results(futures) + local results = {} + for name, future in pairs(futures) do + local err + results[name], err = future:wait_result() + if err then + results[name] = err + end + end + return results +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... + +-- Some simple checks for new object - stream +test_run:cmd("start server test with args='1'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... +conn_1 = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn_1:new_stream() + | --- + | ... +conn_2 = net_box.connect(server_addr) + | --- + | ... +stream_2 = conn_2:new_stream() + | --- + | ... +-- Stream is a wrapper around connection, so if you close connection +-- you close stream, and vice versa. +conn_1:close() + | --- + | ... +assert(not stream_1:ping()) + | --- + | - true + | ... +stream_2:close() + | --- + | ... +assert(not conn_2:ping()) + | --- + | - true + | ... +-- new_stream method unsupported for stream +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... +-- Unsupported for stream +stream:new_stream() + | --- + | - error: Unsupported for stream + | ... +conn:close() + | --- + | ... + +-- Check that spaces in stream object updates, during reload_schema +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Create one space on server +s = box.schema.space.create('test', { engine = 'memtx' }) + | --- + | ... +_ = s:create_index('primary') + | --- + | ... +test_run:switch("default") + | --- + | - true + | ... +assert(not conn.space.test) + | --- + | - true + | ... +assert(not stream.space.test) + | --- + | - true + | ... +assert(conn.schema_version == stream._schema_version) + | --- + | - true + | ... +conn:reload_schema() + | --- + | ... +assert(conn.space.test ~= nil) + | --- + | - true + | ... +assert(conn.schema_version ~= stream._schema_version) + | --- + | - true + | ... +assert(stream.space.test ~= nil) + | --- + | - true + | ... +-- When we touch stream.space, we compare stream._schema_version +-- and conn.schema_version if they are not equal, we clear stream +-- space cache, update it's _schema_version and load space from +-- connection to stream space cache. +assert(conn.schema_version == stream._schema_version) + | --- + | - true + | ... +collectgarbage() + | --- + | - 0 + | ... +collectgarbage() + | --- + | - 0 + | ... +assert(conn.space.test ~= nil) + | --- + | - true + | ... +assert(stream.space.test ~= nil) + | --- + | - true + | ... +test_run:switch("test") + | --- + | - true + | ... +s:drop() + | --- + | ... +test_run:switch("default") + | --- + | - true + | ... +conn:reload_schema() + | --- + | ... +assert(not conn.space.test) + | --- + | - true + | ... +assert(not stream.space.test) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- All test works with iproto_thread count = 10 + +test_run:cmd("start server test with args='10'") + | --- + | - true + | ... +test_run:switch('test') + | --- + | - true + | ... +fiber = require('fiber') + | --- + | ... +s = box.schema.space.create('test', { engine = 'memtx' }) + | --- + | ... +_ = s:create_index('primary') + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +function replace_with_yeild(item) + fiber.sleep(0.1) + return s:replace({item}) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +conn_space = conn.space.test + | --- + | ... +stream = conn:new_stream() + | --- + | ... +stream_space = stream.space.test + | --- + | ... + +-- Check that all requests in stream processed consistently +futures = {} + | --- + | ... +replace_count = 3 + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +for i = 1, replace_count do + futures[string.format("replace_%d", i)] = + stream_space:replace({i}, {is_async = true}) + futures[string.format("select_%d", i)] = + stream_space:select({}, {is_async = true}) +end; + | --- + | ... +futures["replace_with_yeild_for_stream"] = + stream:call("replace_with_yeild", + { replace_count + 1 }, {is_async = true}); + | --- + | ... +futures["select_with_yeild_for_stream"] = + stream_space:select({}, {is_async = true}); + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +results = wait_and_return_results(futures) + | --- + | ... +-- [1] +assert(results["select_1"]) + | --- + | - - [1] + | ... +-- [1] [2] +assert(results["select_2"]) + | --- + | - - [1] + | - [2] + | ... +-- [1] [2] [3] +assert(results["select_3"]) + | --- + | - - [1] + | - [2] + | - [3] + | ... +-- [1] [2] [3] [4] +-- Even yeild in replace function does not affect +-- the order of requests execution in stream +assert(results["select_with_yeild_for_stream"]) + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | ... + +-- There is no request execution order for the connection +futures = {} + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +futures["replace_with_yeild_for_connection"] = + conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true}); + | --- + | ... +futures["select_with_yeild_for_connection"] = + conn_space:select({}, {is_async = true}); + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +results = wait_and_return_results(futures) + | --- + | ... +-- [1] [2] [3] [4] +-- Select will be processed earlier because of +-- yeild in `replace_with_yeild` function +assert(results["select_with_yeild_for_connection"]) + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- [1] [2] [3] [4] [5] +s:select() + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +-- Сheck that stream object is not leak +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +streams = {} +spaces = {} +conns = setmetatable({ conn }, {__mode = 'v'}) +count = 10 +for i = 1, count do + streams[i] = conn:new_stream() + spaces[i] = streams[i].space.test + assert(spaces[i]) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +assert(#conn._streams == count) + | --- + | - true + | ... +spaces = nil + | --- + | ... +streams = nil + | --- + | ... +collectgarbage() + | --- + | - 0 + | ... +assert(#conn._streams == 0) + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +assert(#conns == 1) + | --- + | - true + | ... +conn = nil + | --- + | ... +collectgarbage() + | --- + | - 0 + | ... +assert(#conns == 0) + | --- + | - true + | ... + +-- Check that all request will be processed +-- after connection close. +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... +space = stream.space.test + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +replace_count = 20 +for i = 1, replace_count do + space:replace({i}, {is_async = true}) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +-- Give time to send +fiber.sleep(0) + | --- + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:switch("test") + | --- + | - true + | ... +-- select return tuples from [1] to [20] +-- because all messages processed after +-- connection closed +s:select{} + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | - [6] + | - [7] + | - [8] + | - [9] + | - [10] + | - [11] + | - [12] + | - [13] + | - [14] + | - [15] + | - [16] + | - [17] + | - [18] + | - [19] + | - [20] + | ... +s:drop() + | --- + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch("default") + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +test_run:cmd("cleanup server test") + | --- + | - true + | ... +test_run:cmd("delete server test") + | --- + | - true + | ... diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua new file mode 100644 index 000000000..190f17d8e --- /dev/null +++ b/test/box/stream.test.lua @@ -0,0 +1,207 @@ +-- This test checks streams iplementation in iproto (gh-5860). +net_box = require('net.box') +fiber = require('fiber') +test_run = require('test_run').new() + +test_run:cmd("create server test with script='box/stream.lua'") + +test_run:cmd("setopt delimiter ';'") +function get_current_connection_count() + local total_net_stat_table = + test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table) + local connection_stat_table = total_net_stat_table.CONNECTIONS + assert(connection_stat_table) + return connection_stat_table.current +end; +function wait_and_return_results(futures) + local results = {} + for name, future in pairs(futures) do + local err + results[name], err = future:wait_result() + if err then + results[name] = err + end + end + return results +end; +test_run:cmd("setopt delimiter ''"); + +-- Some simple checks for new object - stream +test_run:cmd("start server test with args='1'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +conn_1 = net_box.connect(server_addr) +stream_1 = conn_1:new_stream() +conn_2 = net_box.connect(server_addr) +stream_2 = conn_2:new_stream() +-- Stream is a wrapper around connection, so if you close connection +-- you close stream, and vice versa. +conn_1:close() +assert(not stream_1:ping()) +stream_2:close() +assert(not conn_2:ping()) +-- new_stream method unsupported for stream +conn = net_box.connect(server_addr) +stream = conn:new_stream() +-- Unsupported for stream +stream:new_stream() +conn:close() + +-- Check that spaces in stream object updates, during reload_schema +conn = net_box.connect(server_addr) +stream = conn:new_stream() +test_run:switch("test") +-- Create one space on server +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:switch("default") +assert(not conn.space.test) +assert(not stream.space.test) +assert(conn.schema_version == stream._schema_version) +conn:reload_schema() +assert(conn.space.test ~= nil) +assert(conn.schema_version ~= stream._schema_version) +assert(stream.space.test ~= nil) +-- When we touch stream.space, we compare stream._schema_version +-- and conn.schema_version if they are not equal, we clear stream +-- space cache, update it's _schema_version and load space from +-- connection to stream space cache. +assert(conn.schema_version == stream._schema_version) +collectgarbage() +collectgarbage() +assert(conn.space.test ~= nil) +assert(stream.space.test ~= nil) +test_run:switch("test") +s:drop() +test_run:switch("default") +conn:reload_schema() +assert(not conn.space.test) +assert(not stream.space.test) +test_run:cmd("stop server test") + +-- All test works with iproto_thread count = 10 + +test_run:cmd("start server test with args='10'") +test_run:switch('test') +fiber = require('fiber') +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:cmd("setopt delimiter ';'") +function replace_with_yeild(item) + fiber.sleep(0.1) + return s:replace({item}) +end; +test_run:cmd("setopt delimiter ''"); +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +conn_space = conn.space.test +stream = conn:new_stream() +stream_space = stream.space.test + +-- Check that all requests in stream processed consistently +futures = {} +replace_count = 3 +test_run:cmd("setopt delimiter ';'") +for i = 1, replace_count do + futures[string.format("replace_%d", i)] = + stream_space:replace({i}, {is_async = true}) + futures[string.format("select_%d", i)] = + stream_space:select({}, {is_async = true}) +end; +futures["replace_with_yeild_for_stream"] = + stream:call("replace_with_yeild", + { replace_count + 1 }, {is_async = true}); +futures["select_with_yeild_for_stream"] = + stream_space:select({}, {is_async = true}); +test_run:cmd("setopt delimiter ''"); +results = wait_and_return_results(futures) +-- [1] +assert(results["select_1"]) +-- [1] [2] +assert(results["select_2"]) +-- [1] [2] [3] +assert(results["select_3"]) +-- [1] [2] [3] [4] +-- Even yeild in replace function does not affect +-- the order of requests execution in stream +assert(results["select_with_yeild_for_stream"]) + +-- There is no request execution order for the connection +futures = {} +test_run:cmd("setopt delimiter ';'") +futures["replace_with_yeild_for_connection"] = + conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true}); +futures["select_with_yeild_for_connection"] = + conn_space:select({}, {is_async = true}); +test_run:cmd("setopt delimiter ''"); +results = wait_and_return_results(futures) +-- [1] [2] [3] [4] +-- Select will be processed earlier because of +-- yeild in `replace_with_yeild` function +assert(results["select_with_yeild_for_connection"]) +test_run:switch("test") +-- [1] [2] [3] [4] [5] +s:select() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +-- Сheck that stream object is not leak +conn = net_box.connect(server_addr) +assert(conn:ping()) +test_run:cmd("setopt delimiter ';'") +streams = {} +spaces = {} +conns = setmetatable({ conn }, {__mode = 'v'}) +count = 10 +for i = 1, count do + streams[i] = conn:new_stream() + spaces[i] = streams[i].space.test + assert(spaces[i]) +end; +test_run:cmd("setopt delimiter ''"); +assert(#conn._streams == count) +spaces = nil +streams = nil +collectgarbage() +assert(#conn._streams == 0) +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +assert(#conns == 1) +conn = nil +collectgarbage() +assert(#conns == 0) + +-- Check that all request will be processed +-- after connection close. +conn = net_box.connect(server_addr) +stream = conn:new_stream() +space = stream.space.test +test_run:cmd("setopt delimiter ';'") +replace_count = 20 +for i = 1, replace_count do + space:replace({i}, {is_async = true}) +end; +test_run:cmd("setopt delimiter ''"); +-- Give time to send +fiber.sleep(0) +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:switch("test") +-- select return tuples from [1] to [20] +-- because all messages processed after +-- connection closed +s:select{} +s:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +test_run:cmd("stop server test") + +test_run:cmd("cleanup server test") +test_run:cmd("delete server test") diff --git a/test/box/suite.ini b/test/box/suite.ini index b5d869fb3..94cf7811f 100644 --- a/test/box/suite.ini +++ b/test/box/suite.ini @@ -5,7 +5,7 @@ script = box.lua disabled = rtree_errinj.test.lua tuple_bench.test.lua long_run = huge_field_map_long.test.lua config = engine.cfg -release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua +release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua stream.test.lua lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua use_unix_sockets = True use_unix_sockets_iproto = True -- 2.20.1