From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 912C5284AE for ; Thu, 29 Mar 2018 09:03:25 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id p_jUwdA8v_ch for ; Thu, 29 Mar 2018 09:03:25 -0400 (EDT) Received: from smtp49.i.mail.ru (smtp49.i.mail.ru [94.100.177.109]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id BCBEC2848E for ; Thu, 29 Mar 2018 09:03:24 -0400 (EDT) Date: Thu, 29 Mar 2018 16:03:21 +0300 From: Konstantin Osipov Subject: [tarantool-patches] Re: [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket Message-ID: <20180329130321.GA21697@atlas> References: <07d883f551feeadcbf50e6c9b4fe37c260f20ec0.1522243429.git.v.shpilevoy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <07d883f551feeadcbf50e6c9b4fe37c260f20ec0.1522243429.git.v.shpilevoy@tarantool.org> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Vladislav Shpilevoy * Vladislav Shpilevoy [18/03/28 16:50]: > --- This patch looks good to me, I will push it now. > src/box/lua/net_box.lua | 199 ++++++++++++++++++++++++++++++++-------------- > test/box/net.box.result | 91 ++++++++++++++++++++- > test/box/net.box.test.lua | 41 +++++++++- > 3 files changed, 264 insertions(+), 67 deletions(-) > > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > index 87c8c548b..5691dcb9e 100644 > --- a/src/box/lua/net_box.lua > +++ b/src/box/lua/net_box.lua > @@ -31,6 +31,7 @@ local sequence_mt = { __serialize = 'sequence' } > local TIMEOUT_INFINITY = 500 * 365 * 86400 > local VSPACE_ID = 281 > local VINDEX_ID = 289 > +local DEFAULT_CONNECT_TIMEOUT = 10 > > local IPROTO_STATUS_KEY = 0x00 > local IPROTO_ERRNO_MASK = 0x7FFF > @@ -70,9 +71,37 @@ local method_codec = { > > local function next_id(id) return band(id + 1, 0x7FFFFFFF) end > > --- function create_transport(host, port, user, password, callback) > -- > --- Transport methods: connect(), close(), perfrom_request(), wait_state() > +-- Connect to a remote server, do handshake. > +-- @param host Hostname. > +-- @param port TCP port. > +-- @param timeout Timeout to connect and receive greeting. > +-- > +-- @retval nil, err Error occured. The reason is returned. > +-- @retval two non-nils A connected socket and a decoded greeting. > +-- > +local function establish_connection(host, port, timeout) > + local timeout = timeout or DEFAULT_CONNECT_TIMEOUT > + local begin = fiber.clock() > + local s = socket.tcp_connect(host, port, timeout) > + if not s then > + return nil, errno.strerror(errno()) > + end > + local msg = s:read({chunk = IPROTO_GREETING_SIZE}, > + timeout - (fiber.clock() - begin)) > + if not msg then > + local err = s:error() > + s:close() > + return nil, err > + end > + local greeting, err = decode_greeting(msg) > + if not greeting then > + s:close() > + return nil, err > + end > + return s, greeting > +end > + > -- > -- Basically, *transport* is a TCP connection speaking one of > -- Tarantool network protocols. This is a low-level interface. > @@ -85,16 +114,17 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end > -- > -- Transport state machine: > -- > --- State machine starts in 'initial' state. Connect method changes > --- the state to 'connecting' and spawns a worker fiber. Close > --- method sets the state to 'closed' and kills the worker. > --- If the transport is already in 'error' state close() does nothing. > +-- State machine starts in 'initial' state. New_sm method > +-- accepts an established connection and spawns a worker fiber. > +-- Stop method sets the state to 'closed' and kills the worker. > +-- If the transport is already in 'error' state stop() does > +-- nothing. > -- > -- State chart: > -- > --- initial -> connecting -> active > +-- connecting -> initial +-> active > -- \ > --- -> auth -> fetch_schema <-> active > +-- +-> auth -> fetch_schema <-> active > -- > -- (any state, on error) -> error_reconnect -> connecting -> ... > -- \ > @@ -121,15 +151,15 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end > -- Suggestion for callback writers: sleep a few secs before approving > -- reconnect. > -- > -local function create_transport(host, port, user, password, callback) > +local function create_transport(host, port, user, password, callback, > + connection, greeting) > -- check / normalize credentials > if user == nil and password ~= nil then > box.error(E_PROC_LUA, 'net.box: user is not defined') > end > if user ~= nil and password == nil then password = '' end > > - -- state: current state, only the worker fiber and connect method > - -- change state > + -- Current state machine's state. > local state = 'initial' > local last_errno > local last_error > @@ -145,7 +175,6 @@ local function create_transport(host, port, user, password, callback) > local next_request_id = 1 > > local worker_fiber > - local connection > local send_buf = buffer.ibuf(buffer.READAHEAD) > local recv_buf = buffer.ibuf(buffer.READAHEAD) > > @@ -185,16 +214,19 @@ local function create_transport(host, port, user, password, callback) > return state == target_state or target_state[state] or false > end > > - -- CONNECT/CLOSE -- > + -- START/STOP -- > local protocol_sm > > - local function connect() > + local function start() > if state ~= 'initial' then return not is_final_state[state] end > - set_state('connecting') > + if not connection then > + set_state('error', E_NO_CONNECTION) > + return > + end > fiber.create(function() > worker_fiber = fiber_self() > fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true}) > - ::reconnect:: > + ::handle_connection:: > local ok, err = pcall(protocol_sm) > if not (ok or is_final_state[state]) then > set_state('error', E_UNKNOWN, err) > @@ -204,21 +236,34 @@ local function create_transport(host, port, user, password, callback) > connection = nil > end > local timeout = callback('reconnect_timeout') > - if timeout and state == 'error_reconnect' then > + -- Non-nil reconnect_timeout is an indicator of set > + -- reconnect_after option. > + while timeout and state == 'error_reconnect' do > fiber.sleep(timeout) > + -- During sleep, a connection could be garbage > + -- collected, or explicitly closed by a user, or > + -- the reconnect_after could be reset. Check the > + -- timeout again. > timeout = callback('reconnect_timeout') > - if timeout and state == 'error_reconnect' then > - goto reconnect > + if not timeout or state ~= 'error_reconnect' then > + break > + end > + connection, greeting = > + establish_connection(host, port, > + callback('fetch_connect_timeout')) > + if connection then > + goto handle_connection > end > + set_state('error_reconnect', E_NO_CONNECTION, greeting) > + timeout = callback('reconnect_timeout') > end > send_buf:recycle() > recv_buf:recycle() > worker_fiber = nil > end) > - return true > end > > - local function close() > + local function stop() > if not is_final_state[state] then > set_state('closed', E_NO_CONNECTION, 'Connection closed') > end > @@ -367,27 +412,14 @@ local function create_transport(host, port, user, password, callback) > -- collected. See gh-3164, where because of reconnect sleeps > -- in this function, a connection could not be deleted. > -- > - protocol_sm = function () > - local tm_begin, tm = fiber.clock(), callback('fetch_connect_timeout') > - connection = socket.tcp_connect(host, port, tm) > - if connection == nil then > - return error_sm(E_NO_CONNECTION, errno.strerror(errno())) > - end > - local size = IPROTO_GREETING_SIZE > - local err, msg = send_and_recv(size, tm - (fiber.clock() - tm_begin)) > - if err then > - return error_sm(err, msg) > - end > - local g = decode_greeting(ffi.string(recv_buf.rpos, size)) > - recv_buf.rpos = recv_buf.rpos + size > - if not g then > - return error_sm(E_NO_CONNECTION, 'Can\'t decode handshake') > - end > - err, msg = callback('handshake', g) > + protocol_sm = function() > + assert(connection) > + assert(greeting) > + local err, msg = callback('handshake', greeting) > if err then > return error_sm(err, msg) > end > - if g.protocol == 'Lua console' then > + if greeting.protocol == 'Lua console' then > local setup_delimiter = 'require("console").delimiter("$EOF$")\n' > method_codec.inject(send_buf, nil, nil, setup_delimiter) > local err, response = send_and_recv_console() > @@ -399,10 +431,11 @@ local function create_transport(host, port, user, password, callback) > local rid = next_request_id > set_state('active') > return console_sm(rid) > - elseif g.protocol == 'Binary' then > - return iproto_auth_sm(g.salt) > + elseif greeting.protocol == 'Binary' then > + return iproto_auth_sm(greeting.salt) > else > - return error_sm(E_NO_CONNECTION, 'Unknown protocol: ' .. g.protocol) > + return error_sm(E_NO_CONNECTION, > + 'Unknown protocol: '..greeting.protocol) > end > end > > @@ -524,14 +557,14 @@ local function create_transport(host, port, user, password, callback) > end > > return { > - close = close, > - connect = connect, > + stop = stop, > + start = start, > wait_state = wait_state, > perform_request = perform_request > } > end > > --- Wrap create_transport, adding auto-close-on-GC feature. > +-- Wrap create_transport, adding auto-stop-on-GC feature. > -- All the GC magic is neatly encapsulated! > -- The tricky part is the callback: > -- * callback (typically) references the transport (indirectly); > @@ -539,23 +572,24 @@ end > -- * fibers are GC roots - i.e. transport is never GC-ed! > -- We solve the issue by making the worker->callback ref weak. > -- Now it is necessary to have a strong ref to callback somewhere or > --- it is GC-ed prematurely. We wrap close() method, stashing the > --- ref in an upvalue (close() performance doesn't matter much.) > -local create_transport = function(host, port, user, password, callback) > +-- it is GC-ed prematurely. We wrap stop() method, stashing the > +-- ref in an upvalue (stop() performance doesn't matter much.) > +local create_transport = function(host, port, user, password, callback, > + connection, greeting) > local weak_refs = setmetatable({callback = callback}, {__mode = 'v'}) > local function weak_callback(...) > local callback = weak_refs.callback > if callback then return callback(...) end > end > - local transport = create_transport(host, port, user, > - password, weak_callback) > - local transport_close = transport.close > + local transport = create_transport(host, port, user, password, > + weak_callback, connection, greeting) > + local transport_stop = transport.stop > local gc_hook = ffi.gc(ffi.new('char[1]'), function() > - pcall(transport_close) > + pcall(transport_stop) > end) > - transport.close = function() > + transport.stop = function() > -- dummy gc_hook, callback refs prevent premature GC > - return transport_close(gc_hook, callback) > + return transport_stop(gc_hook, callback) > end > return transport > end > @@ -612,8 +646,7 @@ local console_mt = { > > local space_metatable, index_metatable > > -local function connect(...) > - local host, port, opts = parse_connect_params(...) > +local function new_sm(host, port, opts, connection, greeting) > local user, password = opts.user, opts.password; opts.password = nil > local last_reconnect_error > local remote = {host = host, port = port, opts = opts, state = 'initial'} > @@ -652,7 +685,7 @@ local function connect(...) > elseif what == 'will_fetch_schema' then > return not opts.console > elseif what == 'fetch_connect_timeout' then > - return opts.connect_timeout or 10 > + return opts.connect_timeout or DEFAULT_CONNECT_TIMEOUT > elseif what == 'did_fetch_schema' then > remote:_install_schema(...) > elseif what == 'reconnect_timeout' then > @@ -679,14 +712,56 @@ local function connect(...) > remote._on_schema_reload = trigger.new("on_schema_reload") > remote._on_disconnect = trigger.new("on_disconnect") > remote._on_connect = trigger.new("on_connect") > - remote._transport = create_transport(host, port, user, password, callback) > - remote._transport.connect() > + remote._transport = create_transport(host, port, user, password, callback, > + connection, greeting) > + remote._transport.start() > if opts.wait_connected ~= false then > remote._transport.wait_state('active', tonumber(opts.wait_connected)) > end > return remote > end > > +-- > +-- Wrap an existing connection into net.box API. > +-- @param connection Connected socket. > +-- @param greeting Decoded greeting, received from a server. > +-- @param host Hostname to which @a connection is established. > +-- @param port TCP port to which @a connection is established. > +-- @param opts Options like reconnect_after, connect_timeout, > +-- wait_connected, login, password, ... > +-- > +-- @retval Net.box object. > +-- > +local function wrap(connection, greeting, host, port, opts) > + if connection == nil or type(greeting) ~= 'table' then > + error('Usage: netbox.wrap(socket, greeting, [opts])') > + end > + opts = opts or {} > + return new_sm(host, port, opts, connection, greeting) > +end > + > +-- > +-- Connect to a remote server. > +-- @param uri OR host and port. URI is a string like > +-- hostname:port@login:password. Host and port can be > +-- passed separately with login and password in the next > +-- parameter. > +-- @param opts @Sa wrap(). > +-- > +-- @retval Net.box object. > +-- > +local function connect(...) > + local host, port, opts = parse_connect_params(...) > + local connection, greeting = > + establish_connection(host, port, opts.connect_timeout) > + if not connection then > + local dummy_conn = new_sm(host, port, opts) > + dummy_conn.error = greeting > + return dummy_conn > + end > + return new_sm(host, port, opts, connection, greeting) > +end > + > local function check_remote_arg(remote, method) > if type(remote) ~= 'table' then > local fmt = 'Use remote:%s(...) instead of remote.%s(...):' > @@ -710,7 +785,7 @@ end > > function remote_methods:close() > check_remote_arg(self, 'close') > - self._transport.close() > + self._transport.stop() > end > > function remote_methods:on_schema_reload(...) > @@ -1112,7 +1187,9 @@ end > local this_module = { > create_transport = create_transport, > connect = connect, > - new = connect -- Tarantool < 1.7.1 compatibility > + new = connect, -- Tarantool < 1.7.1 compatibility, > + wrap = wrap, > + establish_connection = establish_connection, > } > > function this_module.timeout(timeout, ...) > diff --git a/test/box/net.box.result b/test/box/net.box.result > index 46d85b327..c541cad60 100644 > --- a/test/box/net.box.result > +++ b/test/box/net.box.result > @@ -1754,7 +1754,7 @@ nb = net.new('localhost:3392', { > }); > --- > ... > -nb.error == "Timeout exceeded" or nb.error == "Connection timed out"; > +assert(nb.error == 'Operation timed out'); > --- > - true > ... > @@ -2292,12 +2292,97 @@ weak.c > --- > - null > ... > -box.schema.user.revoke('guest', 'execute', 'universe') > +-- > +-- gh-2677: netbox supports console connections, that complicates > +-- both console and netbox. It was necessary because before a > +-- connection is established, a console does not known is it > +-- binary or text protocol, and netbox could not be created from > +-- existing socket. > +-- > +box.schema.user.grant('guest','read,write,execute','universe') > +--- > +... > +urilib = require('uri') > +--- > +... > +uri = urilib.parse(tostring(box.cfg.listen)) > +--- > +... > +s, greeting = net.establish_connection(uri.host, uri.service) > +--- > +... > +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01}) > +--- > +... > +c.state > +--- > +- active > +... > +a = 100 > +--- > +... > +function kek(args) return {1, 2, 3, args} end > +--- > +... > +c:eval('a = 200') > +--- > +... > +a > +--- > +- 200 > +... > +c:call('kek', {300}) > +--- > +- [1, 2, 3, 300] > +... > +s = box.schema.create_space('test') > +--- > +... > +pk = s:create_index('pk') > +--- > +... > +c:reload_schema() > +--- > +... > +c.space.test:replace{1} > +--- > +- [1] > +... > +c.space.test:get{1} > +--- > +- [1] > +... > +c.space.test:delete{1} > +--- > +- [1] > +... > +-- > +-- Break a connection to test reconnect_after. > +-- > +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') > +--- > +... > +c.state > +--- > +- error_reconnect > +... > +while not c:is_connected() do fiber.sleep(0.01) end > +--- > +... > +c:ping() > +--- > +- true > +... > +s:drop() > --- > ... > c:close() > --- > ... > -c = nil > +c.state > +--- > +- closed > +... > +box.schema.user.revoke('guest', 'read,write,execute', 'universe') > --- > ... > diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua > index 87e26f84c..d828ac529 100644 > --- a/test/box/net.box.test.lua > +++ b/test/box/net.box.test.lua > @@ -701,7 +701,7 @@ nb = net.new('localhost:3392', { > wait_connected = true, console = true, > connect_timeout = 0.01 > }); > -nb.error == "Timeout exceeded" or nb.error == "Connection timed out"; > +assert(nb.error == 'Operation timed out'); > nb:close(); > -- we must get peer closed > nb = net.new('localhost:3392', { > @@ -937,6 +937,41 @@ collectgarbage('collect') > -- connection is deleted by 'collect'. > weak.c > > -box.schema.user.revoke('guest', 'execute', 'universe') > +-- > +-- gh-2677: netbox supports console connections, that complicates > +-- both console and netbox. It was necessary because before a > +-- connection is established, a console does not known is it > +-- binary or text protocol, and netbox could not be created from > +-- existing socket. > +-- > +box.schema.user.grant('guest','read,write,execute','universe') > +urilib = require('uri') > +uri = urilib.parse(tostring(box.cfg.listen)) > +s, greeting = net.establish_connection(uri.host, uri.service) > +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01}) > +c.state > + > +a = 100 > +function kek(args) return {1, 2, 3, args} end > +c:eval('a = 200') > +a > +c:call('kek', {300}) > +s = box.schema.create_space('test') > +pk = s:create_index('pk') > +c:reload_schema() > +c.space.test:replace{1} > +c.space.test:get{1} > +c.space.test:delete{1} > +-- > +-- Break a connection to test reconnect_after. > +-- > +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') > +c.state > +while not c:is_connected() do fiber.sleep(0.01) end > +c:ping() > + > +s:drop() > c:close() > -c = nil > +c.state > + > +box.schema.user.revoke('guest', 'read,write,execute', 'universe') > -- > 2.14.3 (Apple Git-98) > > -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov