From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id BAA776EC40; Wed, 11 Aug 2021 14:52:10 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org BAA776EC40 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628682730; bh=UPvrkxm7rpcFg6PHwu8TKp5Y1sJxaOhecVaMA74dHw4=; h=Date:To:Cc:References:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=FeFBpymNqaDgQXX1AnGBAWLBXcrhtvn0LCDUcqGb+OR5rKTvz2G5zkbTywYc1TIrE PBZAO9MxfqkQvG14f8YUVmTVoyXWRKbKE/TnwCGeJsjGyek5fam7iUC8RD2ODT52sT u4yaZ62DtnPzrUgoAefikaIOXF95r4GrgRw8RrGQ= Received: from smtpng2.i.mail.ru (smtpng2.i.mail.ru [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 4AB786EC40 for ; Wed, 11 Aug 2021 14:52:09 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 4AB786EC40 Received: by smtpng2.m.smailru.net with esmtpa (envelope-from ) id 1mDmmG-0006XK-9c; Wed, 11 Aug 2021 14:52:08 +0300 Date: Wed, 11 Aug 2021 14:52:05 +0300 To: mechanik20051988 Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org, mechanik20051988 Message-ID: <20210811115205.hkhbui3zzeu63544@esperanza> References: <56db4eea5c868187e151658427e1a6d65736771a.1628671235.git.mechanik20051988@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <56db4eea5c868187e151658427e1a6d65736771a.1628671235.git.mechanik20051988@tarantool.org> X-4EC0790: 10 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD906AB4890CDABF0C5CB76CEE71D3E4007182A05F538085040C27A1F5559779BD33502F49889E830B49347268A2A1FEC80917AD2D5198888EE X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE712EB008F780777E9EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006378D70459430292EC88638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D825999C2A0742D6597BC899EF438DB21B117882F4460429724CE54428C33FAD305F5C1EE8F4F765FC8C7ADC89C2F0B2A5A471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F44604297287769387670735201E561CDFBCA1751FF6B57BC7E6449061A352F6E88A58FB86F5D81C698A659EA7E827F84554CEF5019E625A9149C048EE9ECD01F8117BC8BEE2021AF6380DFAD18AA50765F790063735872C767BF85DA227C277FBC8AE2E8BDCE939D40DBB93CA75ECD9A6C639B01B4E70A05D1297E1BBCB5012B2E24CD356 X-C1DE0DAB: 0D63561A33F958A59E16C393219F6AAEC3C4A4C33223147B8FD2511C3ABEE29CD59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA753177526CD55AFC11410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D340A4C04F5DECA7EE9841BB390B0B369381313E2CB1E637A3DFF3C1A6A95BD97276AAEC179E99119291D7E09C32AA3244C6C75BB90B9882C56E8786A0FE14D2984A8CE788DE683120583B48618A63566E0 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj6qlzQV0oSZPwPgY3wYVevA== X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5DF5A2CA2D7C453A68C41AC6F91C67E061274CEFED1673C562683ABF942079399BFB559BB5D741EB966A65DFF43FF7BE03240331F90058701C67EA787935ED9F1B X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladimir Davydov via Tarantool-patches Reply-To: Vladimir Davydov Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" On Wed, Aug 11, 2021 at 11:56:56AM +0300, mechanik20051988 wrote: > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > index 8f5671c15..3dffc245f 100644 > --- a/src/box/lua/net_box.lua > +++ b/src/box/lua/net_box.lua > @@ -635,6 +749,28 @@ local function check_eval_args(args) > end > end > > +local function new_stream(stream) Let's rename this function to stream_new_stream to emphasize that it's a stream method. > + check_remote_arg(stream, 'new_stream') > + return stream._conn:new_stream() > +end > + > +function remote_methods:new_stream() > + check_remote_arg(self, 'new_stream') > + self._last_stream_id = self._last_stream_id + 1 > + local stream = setmetatable({ > + new_stream = new_stream, > + _stream_id = self._last_stream_id, > + space = setmetatable({ > + _stream_space_cache = {}, > + _stream = nil, > + }, stream_spaces_mt), > + _conn = self, > + _schema_version = self.schema_version, > + }, { __index = self, __serialize = stream_serialize }) > + stream.space._stream = stream > + return stream > +end > + > function remote_methods:close() > check_remote_arg(self, 'close') > self._transport.stop() > @@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout) > return self._transport.wait_state('active', timeout) > end > > -function remote_methods:_request(method, opts, format, ...) > +function remote_methods:_request(method, opts, format, stream_id, ...) > local transport = self._transport > local on_push, on_push_ctx, buffer, skip_header, deadline > -- Extract options, set defaults, check if the request is > @@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...) > local res, err = > transport.perform_async_request(buffer, skip_header, method, > table.insert, {}, format, > - ...) > + stream_id, ...) > if err then > box.error(err) > end > @@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...) > end > local res, err = transport.perform_request(timeout, buffer, skip_header, > method, on_push, on_push_ctx, > - format, ...) > + format, stream_id, ...) Please use self._stream_id here. Then you won't need to pass stream_id from all the functions below. > if err then > box.error(err) > end > @@ -718,7 +854,7 @@ end > > function remote_methods:ping(opts) > check_remote_arg(self, 'ping') > - return (pcall(self._request, self, M_PING, opts)) > + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id)) > end > > function remote_methods:reload_schema() > @@ -729,14 +865,16 @@ end > -- @deprecated since 1.7.4 > function remote_methods:call_16(func_name, ...) > check_remote_arg(self, 'call') > - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...})) > + return (self:_request(M_CALL_16, nil, nil, self._stream_id, > + tostring(func_name), {...})) > end > > function remote_methods:call(func_name, args, opts) > check_remote_arg(self, 'call') > check_call_args(args) > args = args or {} > - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args) > + local res = self:_request(M_CALL_17, opts, nil, self._stream_id, > + tostring(func_name), args) > if type(res) ~= 'table' or opts and opts.is_async then > return res > end > @@ -746,14 +884,15 @@ end > -- @deprecated since 1.7.4 > function remote_methods:eval_16(code, ...) > check_remote_arg(self, 'eval') > - return unpack((self:_request(M_EVAL, nil, nil, code, {...}))) > + return unpack((self:_request(M_EVAL, nil, nil, self._stream_id, > + code, {...}))) > end > > function remote_methods:eval(code, args, opts) > check_remote_arg(self, 'eval') > check_eval_args(args) > args = args or {} > - local res = self:_request(M_EVAL, opts, nil, code, args) > + local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args) > if type(res) ~= 'table' or opts and opts.is_async then > return res > end > @@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts) > if sql_opts ~= nil then > box.error(box.error.UNSUPPORTED, "execute", "options") > end > - return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {}, > - sql_opts or {}) > + return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id, > + query, parameters or {}, sql_opts or {}) > end > > function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args > @@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua > if sql_opts ~= nil then > box.error(box.error.UNSUPPORTED, "prepare", "options") > end > - return self:_request(M_PREPARE, netbox_opts, nil, query) > + return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query) > end > > function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) > @@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) > if sql_opts ~= nil then > box.error(box.error.UNSUPPORTED, "unprepare", "options") > end > - return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {}, > - sql_opts or {}) > + return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id, > + query, parameters or {}, sql_opts or {}) > end > > function remote_methods:wait_state(state, timeout) > @@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout) > end > if self.protocol == 'Binary' then > local loader = 'return require("console").eval(...)' > - res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader, > + res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader, > {line}) > else > assert(self.protocol == 'Lua console') > - res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, > + res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil, > line..'$EOF$\n') > end > if err then > @@ -951,14 +1090,14 @@ space_metatable = function(remote) > > function methods:insert(tuple, opts) > check_space_arg(self, 'insert') > - return remote:_request(M_INSERT, opts, self._format_cdata, self.id, > - tuple) > + return remote:_request(M_INSERT, opts, self._format_cdata, > + self._stream_id, self.id, tuple) > end > > function methods:replace(tuple, opts) > check_space_arg(self, 'replace') > - return remote:_request(M_REPLACE, opts, self._format_cdata, self.id, > - tuple) > + return remote:_request(M_REPLACE, opts, self._format_cdata, > + self._stream_id, self.id, tuple) > end > > function methods:select(key, opts) > @@ -978,7 +1117,8 @@ space_metatable = function(remote) > > function methods:upsert(key, oplist, opts) > check_space_arg(self, 'upsert') > - return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id, > + return nothing_or_data(remote:_request(M_UPSERT, opts, nil, > + self._stream_id, self.id, > key, oplist)) > end > > @@ -1009,8 +1149,8 @@ index_metatable = function(remote) > local offset = tonumber(opts and opts.offset) or 0 > local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF > return (remote:_request(M_SELECT, opts, self.space._format_cdata, > - self.space.id, self.id, iterator, offset, > - limit, key)) > + self._stream_id, self.space.id, self.id, > + iterator, offset, limit, key)) > end > > function methods:get(key, opts) > @@ -1020,6 +1160,7 @@ index_metatable = function(remote) > end > return nothing_or_data(remote:_request(M_GET, opts, > self.space._format_cdata, > + self._stream_id, > self.space.id, self.id, > box.index.EQ, 0, 2, key)) > end > @@ -1031,6 +1172,7 @@ index_metatable = function(remote) > end > return nothing_or_data(remote:_request(M_MIN, opts, > self.space._format_cdata, > + self._stream_id, > self.space.id, self.id, > box.index.GE, 0, 1, key)) > end > @@ -1042,6 +1184,7 @@ index_metatable = function(remote) > end > return nothing_or_data(remote:_request(M_MAX, opts, > self.space._format_cdata, > + self._stream_id, > self.space.id, self.id, > box.index.LE, 0, 1, key)) > end > @@ -1053,22 +1196,24 @@ index_metatable = function(remote) > end > local code = string.format('box.space.%s.index.%s:count', > self.space.name, self.name) > - return remote:_request(M_COUNT, opts, nil, code, { key, opts }) > + return remote:_request(M_COUNT, opts, nil, self._stream_id, > + code, { key, opts }) > end > > function methods:delete(key, opts) > check_index_arg(self, 'delete') > return nothing_or_data(remote:_request(M_DELETE, opts, > self.space._format_cdata, > - self.space.id, self.id, key)) > + self._stream_id, self.space.id, > + self.id, key)) > end > > function methods:update(key, oplist, opts) > check_index_arg(self, 'update') > return nothing_or_data(remote:_request(M_UPDATE, opts, > self.space._format_cdata, > - self.space.id, self.id, key, > - oplist)) > + self._stream_id, self.space.id, > + self.id, key, oplist)) > end > > return { __index = methods, __metatable = false }