* [tarantool-patches] [PATCH v2 0/3] console: do not use netbox for console text connections @ 2018-03-28 13:35 Vladislav Shpilevoy 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket Vladislav Shpilevoy ` (2 more replies) 0 siblings, 3 replies; 6+ messages in thread From: Vladislav Shpilevoy @ 2018-03-28 13:35 UTC (permalink / raw) To: tarantool-patches; +Cc: Vladislav Shpilevoy Branch: http://github.com/tarantool/tarantool/tree/gh-2677-prepare-console-for-push Issue: https://github.com/tarantool/tarantool/issues/2677 Netbox console support complicates both netbox and console. Lets use sockets directly for text protocol. But the problem is that before a connection is established, the connection protocol is unknown (binary or text). It can be determined only when a greeting is read and decoded. After that the socket must be either wrapped by netbox API for binary connections, or by text console API for text connections. To be able to do that the new netbox functions are introduced: establish_connection and wrap. Netbox.establish_connection allows to connect to a server, read and decode greeting. Netbox.wrap creates a netbox state machine around a given socket. Vladislav Shpilevoy (3): netbox: allow to create a netbox connection from existing socket console: do not use netbox for console text connections netbox: deprecate console support src/box/lua/console.lua | 162 +++++++++++++++++++++++++++++------- src/box/lua/net_box.lua | 204 ++++++++++++++++++++++++++++++++-------------- test/box/net.box.result | 91 ++++++++++++++++++++- test/box/net.box.test.lua | 41 +++++++++- 4 files changed, 400 insertions(+), 98 deletions(-) -- 2.14.3 (Apple Git-98) ^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket 2018-03-28 13:35 [tarantool-patches] [PATCH v2 0/3] console: do not use netbox for console text connections Vladislav Shpilevoy @ 2018-03-28 13:35 ` Vladislav Shpilevoy 2018-03-29 13:03 ` [tarantool-patches] " Konstantin Osipov 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 2/3] console: do not use netbox for console text connections Vladislav Shpilevoy 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 3/3] netbox: deprecate console support Vladislav Shpilevoy 2 siblings, 1 reply; 6+ messages in thread From: Vladislav Shpilevoy @ 2018-03-28 13:35 UTC (permalink / raw) To: tarantool-patches; +Cc: Vladislav Shpilevoy It is needed to create a binary console connection, when a socket is already created and a greeting is read and decoded. --- src/box/lua/net_box.lua | 199 ++++++++++++++++++++++++++++++++-------------- test/box/net.box.result | 91 ++++++++++++++++++++- test/box/net.box.test.lua | 41 +++++++++- 3 files changed, 264 insertions(+), 67 deletions(-) diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 87c8c548b..5691dcb9e 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -31,6 +31,7 @@ local sequence_mt = { __serialize = 'sequence' } local TIMEOUT_INFINITY = 500 * 365 * 86400 local VSPACE_ID = 281 local VINDEX_ID = 289 +local DEFAULT_CONNECT_TIMEOUT = 10 local IPROTO_STATUS_KEY = 0x00 local IPROTO_ERRNO_MASK = 0x7FFF @@ -70,9 +71,37 @@ local method_codec = { local function next_id(id) return band(id + 1, 0x7FFFFFFF) end --- function create_transport(host, port, user, password, callback) -- --- Transport methods: connect(), close(), perfrom_request(), wait_state() +-- Connect to a remote server, do handshake. +-- @param host Hostname. +-- @param port TCP port. +-- @param timeout Timeout to connect and receive greeting. +-- +-- @retval nil, err Error occured. The reason is returned. +-- @retval two non-nils A connected socket and a decoded greeting. +-- +local function establish_connection(host, port, timeout) + local timeout = timeout or DEFAULT_CONNECT_TIMEOUT + local begin = fiber.clock() + local s = socket.tcp_connect(host, port, timeout) + if not s then + return nil, errno.strerror(errno()) + end + local msg = s:read({chunk = IPROTO_GREETING_SIZE}, + timeout - (fiber.clock() - begin)) + if not msg then + local err = s:error() + s:close() + return nil, err + end + local greeting, err = decode_greeting(msg) + if not greeting then + s:close() + return nil, err + end + return s, greeting +end + -- -- Basically, *transport* is a TCP connection speaking one of -- Tarantool network protocols. This is a low-level interface. @@ -85,16 +114,17 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end -- -- Transport state machine: -- --- State machine starts in 'initial' state. Connect method changes --- the state to 'connecting' and spawns a worker fiber. Close --- method sets the state to 'closed' and kills the worker. --- If the transport is already in 'error' state close() does nothing. +-- State machine starts in 'initial' state. New_sm method +-- accepts an established connection and spawns a worker fiber. +-- Stop method sets the state to 'closed' and kills the worker. +-- If the transport is already in 'error' state stop() does +-- nothing. -- -- State chart: -- --- initial -> connecting -> active +-- connecting -> initial +-> active -- \ --- -> auth -> fetch_schema <-> active +-- +-> auth -> fetch_schema <-> active -- -- (any state, on error) -> error_reconnect -> connecting -> ... -- \ @@ -121,15 +151,15 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end -- Suggestion for callback writers: sleep a few secs before approving -- reconnect. -- -local function create_transport(host, port, user, password, callback) +local function create_transport(host, port, user, password, callback, + connection, greeting) -- check / normalize credentials if user == nil and password ~= nil then box.error(E_PROC_LUA, 'net.box: user is not defined') end if user ~= nil and password == nil then password = '' end - -- state: current state, only the worker fiber and connect method - -- change state + -- Current state machine's state. local state = 'initial' local last_errno local last_error @@ -145,7 +175,6 @@ local function create_transport(host, port, user, password, callback) local next_request_id = 1 local worker_fiber - local connection local send_buf = buffer.ibuf(buffer.READAHEAD) local recv_buf = buffer.ibuf(buffer.READAHEAD) @@ -185,16 +214,19 @@ local function create_transport(host, port, user, password, callback) return state == target_state or target_state[state] or false end - -- CONNECT/CLOSE -- + -- START/STOP -- local protocol_sm - local function connect() + local function start() if state ~= 'initial' then return not is_final_state[state] end - set_state('connecting') + if not connection then + set_state('error', E_NO_CONNECTION) + return + end fiber.create(function() worker_fiber = fiber_self() fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true}) - ::reconnect:: + ::handle_connection:: local ok, err = pcall(protocol_sm) if not (ok or is_final_state[state]) then set_state('error', E_UNKNOWN, err) @@ -204,21 +236,34 @@ local function create_transport(host, port, user, password, callback) connection = nil end local timeout = callback('reconnect_timeout') - if timeout and state == 'error_reconnect' then + -- Non-nil reconnect_timeout is an indicator of set + -- reconnect_after option. + while timeout and state == 'error_reconnect' do fiber.sleep(timeout) + -- During sleep, a connection could be garbage + -- collected, or explicitly closed by a user, or + -- the reconnect_after could be reset. Check the + -- timeout again. timeout = callback('reconnect_timeout') - if timeout and state == 'error_reconnect' then - goto reconnect + if not timeout or state ~= 'error_reconnect' then + break + end + connection, greeting = + establish_connection(host, port, + callback('fetch_connect_timeout')) + if connection then + goto handle_connection end + set_state('error_reconnect', E_NO_CONNECTION, greeting) + timeout = callback('reconnect_timeout') end send_buf:recycle() recv_buf:recycle() worker_fiber = nil end) - return true end - local function close() + local function stop() if not is_final_state[state] then set_state('closed', E_NO_CONNECTION, 'Connection closed') end @@ -367,27 +412,14 @@ local function create_transport(host, port, user, password, callback) -- collected. See gh-3164, where because of reconnect sleeps -- in this function, a connection could not be deleted. -- - protocol_sm = function () - local tm_begin, tm = fiber.clock(), callback('fetch_connect_timeout') - connection = socket.tcp_connect(host, port, tm) - if connection == nil then - return error_sm(E_NO_CONNECTION, errno.strerror(errno())) - end - local size = IPROTO_GREETING_SIZE - local err, msg = send_and_recv(size, tm - (fiber.clock() - tm_begin)) - if err then - return error_sm(err, msg) - end - local g = decode_greeting(ffi.string(recv_buf.rpos, size)) - recv_buf.rpos = recv_buf.rpos + size - if not g then - return error_sm(E_NO_CONNECTION, 'Can\'t decode handshake') - end - err, msg = callback('handshake', g) + protocol_sm = function() + assert(connection) + assert(greeting) + local err, msg = callback('handshake', greeting) if err then return error_sm(err, msg) end - if g.protocol == 'Lua console' then + if greeting.protocol == 'Lua console' then local setup_delimiter = 'require("console").delimiter("$EOF$")\n' method_codec.inject(send_buf, nil, nil, setup_delimiter) local err, response = send_and_recv_console() @@ -399,10 +431,11 @@ local function create_transport(host, port, user, password, callback) local rid = next_request_id set_state('active') return console_sm(rid) - elseif g.protocol == 'Binary' then - return iproto_auth_sm(g.salt) + elseif greeting.protocol == 'Binary' then + return iproto_auth_sm(greeting.salt) else - return error_sm(E_NO_CONNECTION, 'Unknown protocol: ' .. g.protocol) + return error_sm(E_NO_CONNECTION, + 'Unknown protocol: '..greeting.protocol) end end @@ -524,14 +557,14 @@ local function create_transport(host, port, user, password, callback) end return { - close = close, - connect = connect, + stop = stop, + start = start, wait_state = wait_state, perform_request = perform_request } end --- Wrap create_transport, adding auto-close-on-GC feature. +-- Wrap create_transport, adding auto-stop-on-GC feature. -- All the GC magic is neatly encapsulated! -- The tricky part is the callback: -- * callback (typically) references the transport (indirectly); @@ -539,23 +572,24 @@ end -- * fibers are GC roots - i.e. transport is never GC-ed! -- We solve the issue by making the worker->callback ref weak. -- Now it is necessary to have a strong ref to callback somewhere or --- it is GC-ed prematurely. We wrap close() method, stashing the --- ref in an upvalue (close() performance doesn't matter much.) -local create_transport = function(host, port, user, password, callback) +-- it is GC-ed prematurely. We wrap stop() method, stashing the +-- ref in an upvalue (stop() performance doesn't matter much.) +local create_transport = function(host, port, user, password, callback, + connection, greeting) local weak_refs = setmetatable({callback = callback}, {__mode = 'v'}) local function weak_callback(...) local callback = weak_refs.callback if callback then return callback(...) end end - local transport = create_transport(host, port, user, - password, weak_callback) - local transport_close = transport.close + local transport = create_transport(host, port, user, password, + weak_callback, connection, greeting) + local transport_stop = transport.stop local gc_hook = ffi.gc(ffi.new('char[1]'), function() - pcall(transport_close) + pcall(transport_stop) end) - transport.close = function() + transport.stop = function() -- dummy gc_hook, callback refs prevent premature GC - return transport_close(gc_hook, callback) + return transport_stop(gc_hook, callback) end return transport end @@ -612,8 +646,7 @@ local console_mt = { local space_metatable, index_metatable -local function connect(...) - local host, port, opts = parse_connect_params(...) +local function new_sm(host, port, opts, connection, greeting) local user, password = opts.user, opts.password; opts.password = nil local last_reconnect_error local remote = {host = host, port = port, opts = opts, state = 'initial'} @@ -652,7 +685,7 @@ local function connect(...) elseif what == 'will_fetch_schema' then return not opts.console elseif what == 'fetch_connect_timeout' then - return opts.connect_timeout or 10 + return opts.connect_timeout or DEFAULT_CONNECT_TIMEOUT elseif what == 'did_fetch_schema' then remote:_install_schema(...) elseif what == 'reconnect_timeout' then @@ -679,14 +712,56 @@ local function connect(...) remote._on_schema_reload = trigger.new("on_schema_reload") remote._on_disconnect = trigger.new("on_disconnect") remote._on_connect = trigger.new("on_connect") - remote._transport = create_transport(host, port, user, password, callback) - remote._transport.connect() + remote._transport = create_transport(host, port, user, password, callback, + connection, greeting) + remote._transport.start() if opts.wait_connected ~= false then remote._transport.wait_state('active', tonumber(opts.wait_connected)) end return remote end +-- +-- Wrap an existing connection into net.box API. +-- @param connection Connected socket. +-- @param greeting Decoded greeting, received from a server. +-- @param host Hostname to which @a connection is established. +-- @param port TCP port to which @a connection is established. +-- @param opts Options like reconnect_after, connect_timeout, +-- wait_connected, login, password, ... +-- +-- @retval Net.box object. +-- +local function wrap(connection, greeting, host, port, opts) + if connection == nil or type(greeting) ~= 'table' then + error('Usage: netbox.wrap(socket, greeting, [opts])') + end + opts = opts or {} + return new_sm(host, port, opts, connection, greeting) +end + +-- +-- Connect to a remote server. +-- @param uri OR host and port. URI is a string like +-- hostname:port@login:password. Host and port can be +-- passed separately with login and password in the next +-- parameter. +-- @param opts @Sa wrap(). +-- +-- @retval Net.box object. +-- +local function connect(...) + local host, port, opts = parse_connect_params(...) + local connection, greeting = + establish_connection(host, port, opts.connect_timeout) + if not connection then + local dummy_conn = new_sm(host, port, opts) + dummy_conn.error = greeting + return dummy_conn + end + return new_sm(host, port, opts, connection, greeting) +end + local function check_remote_arg(remote, method) if type(remote) ~= 'table' then local fmt = 'Use remote:%s(...) instead of remote.%s(...):' @@ -710,7 +785,7 @@ end function remote_methods:close() check_remote_arg(self, 'close') - self._transport.close() + self._transport.stop() end function remote_methods:on_schema_reload(...) @@ -1112,7 +1187,9 @@ end local this_module = { create_transport = create_transport, connect = connect, - new = connect -- Tarantool < 1.7.1 compatibility + new = connect, -- Tarantool < 1.7.1 compatibility, + wrap = wrap, + establish_connection = establish_connection, } function this_module.timeout(timeout, ...) diff --git a/test/box/net.box.result b/test/box/net.box.result index 46d85b327..c541cad60 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -1754,7 +1754,7 @@ nb = net.new('localhost:3392', { }); --- ... -nb.error == "Timeout exceeded" or nb.error == "Connection timed out"; +assert(nb.error == 'Operation timed out'); --- - true ... @@ -2292,12 +2292,97 @@ weak.c --- - null ... -box.schema.user.revoke('guest', 'execute', 'universe') +-- +-- gh-2677: netbox supports console connections, that complicates +-- both console and netbox. It was necessary because before a +-- connection is established, a console does not known is it +-- binary or text protocol, and netbox could not be created from +-- existing socket. +-- +box.schema.user.grant('guest','read,write,execute','universe') +--- +... +urilib = require('uri') +--- +... +uri = urilib.parse(tostring(box.cfg.listen)) +--- +... +s, greeting = net.establish_connection(uri.host, uri.service) +--- +... +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01}) +--- +... +c.state +--- +- active +... +a = 100 +--- +... +function kek(args) return {1, 2, 3, args} end +--- +... +c:eval('a = 200') +--- +... +a +--- +- 200 +... +c:call('kek', {300}) +--- +- [1, 2, 3, 300] +... +s = box.schema.create_space('test') +--- +... +pk = s:create_index('pk') +--- +... +c:reload_schema() +--- +... +c.space.test:replace{1} +--- +- [1] +... +c.space.test:get{1} +--- +- [1] +... +c.space.test:delete{1} +--- +- [1] +... +-- +-- Break a connection to test reconnect_after. +-- +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +--- +... +c.state +--- +- error_reconnect +... +while not c:is_connected() do fiber.sleep(0.01) end +--- +... +c:ping() +--- +- true +... +s:drop() --- ... c:close() --- ... -c = nil +c.state +--- +- closed +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') --- ... diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index 87e26f84c..d828ac529 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -701,7 +701,7 @@ nb = net.new('localhost:3392', { wait_connected = true, console = true, connect_timeout = 0.01 }); -nb.error == "Timeout exceeded" or nb.error == "Connection timed out"; +assert(nb.error == 'Operation timed out'); nb:close(); -- we must get peer closed nb = net.new('localhost:3392', { @@ -937,6 +937,41 @@ collectgarbage('collect') -- connection is deleted by 'collect'. weak.c -box.schema.user.revoke('guest', 'execute', 'universe') +-- +-- gh-2677: netbox supports console connections, that complicates +-- both console and netbox. It was necessary because before a +-- connection is established, a console does not known is it +-- binary or text protocol, and netbox could not be created from +-- existing socket. +-- +box.schema.user.grant('guest','read,write,execute','universe') +urilib = require('uri') +uri = urilib.parse(tostring(box.cfg.listen)) +s, greeting = net.establish_connection(uri.host, uri.service) +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01}) +c.state + +a = 100 +function kek(args) return {1, 2, 3, args} end +c:eval('a = 200') +a +c:call('kek', {300}) +s = box.schema.create_space('test') +pk = s:create_index('pk') +c:reload_schema() +c.space.test:replace{1} +c.space.test:get{1} +c.space.test:delete{1} +-- +-- Break a connection to test reconnect_after. +-- +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +c.state +while not c:is_connected() do fiber.sleep(0.01) end +c:ping() + +s:drop() c:close() -c = nil +c.state + +box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- 2.14.3 (Apple Git-98) ^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] Re: [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket Vladislav Shpilevoy @ 2018-03-29 13:03 ` Konstantin Osipov 0 siblings, 0 replies; 6+ messages in thread From: Konstantin Osipov @ 2018-03-29 13:03 UTC (permalink / raw) To: tarantool-patches; +Cc: Vladislav Shpilevoy * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/03/28 16:50]: > --- This patch looks good to me, I will push it now. > src/box/lua/net_box.lua | 199 ++++++++++++++++++++++++++++++++-------------- > test/box/net.box.result | 91 ++++++++++++++++++++- > test/box/net.box.test.lua | 41 +++++++++- > 3 files changed, 264 insertions(+), 67 deletions(-) > > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > index 87c8c548b..5691dcb9e 100644 > --- a/src/box/lua/net_box.lua > +++ b/src/box/lua/net_box.lua > @@ -31,6 +31,7 @@ local sequence_mt = { __serialize = 'sequence' } > local TIMEOUT_INFINITY = 500 * 365 * 86400 > local VSPACE_ID = 281 > local VINDEX_ID = 289 > +local DEFAULT_CONNECT_TIMEOUT = 10 > > local IPROTO_STATUS_KEY = 0x00 > local IPROTO_ERRNO_MASK = 0x7FFF > @@ -70,9 +71,37 @@ local method_codec = { > > local function next_id(id) return band(id + 1, 0x7FFFFFFF) end > > --- function create_transport(host, port, user, password, callback) > -- > --- Transport methods: connect(), close(), perfrom_request(), wait_state() > +-- Connect to a remote server, do handshake. > +-- @param host Hostname. > +-- @param port TCP port. > +-- @param timeout Timeout to connect and receive greeting. > +-- > +-- @retval nil, err Error occured. The reason is returned. > +-- @retval two non-nils A connected socket and a decoded greeting. > +-- > +local function establish_connection(host, port, timeout) > + local timeout = timeout or DEFAULT_CONNECT_TIMEOUT > + local begin = fiber.clock() > + local s = socket.tcp_connect(host, port, timeout) > + if not s then > + return nil, errno.strerror(errno()) > + end > + local msg = s:read({chunk = IPROTO_GREETING_SIZE}, > + timeout - (fiber.clock() - begin)) > + if not msg then > + local err = s:error() > + s:close() > + return nil, err > + end > + local greeting, err = decode_greeting(msg) > + if not greeting then > + s:close() > + return nil, err > + end > + return s, greeting > +end > + > -- > -- Basically, *transport* is a TCP connection speaking one of > -- Tarantool network protocols. This is a low-level interface. > @@ -85,16 +114,17 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end > -- > -- Transport state machine: > -- > --- State machine starts in 'initial' state. Connect method changes > --- the state to 'connecting' and spawns a worker fiber. Close > --- method sets the state to 'closed' and kills the worker. > --- If the transport is already in 'error' state close() does nothing. > +-- State machine starts in 'initial' state. New_sm method > +-- accepts an established connection and spawns a worker fiber. > +-- Stop method sets the state to 'closed' and kills the worker. > +-- If the transport is already in 'error' state stop() does > +-- nothing. > -- > -- State chart: > -- > --- initial -> connecting -> active > +-- connecting -> initial +-> active > -- \ > --- -> auth -> fetch_schema <-> active > +-- +-> auth -> fetch_schema <-> active > -- > -- (any state, on error) -> error_reconnect -> connecting -> ... > -- \ > @@ -121,15 +151,15 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end > -- Suggestion for callback writers: sleep a few secs before approving > -- reconnect. > -- > -local function create_transport(host, port, user, password, callback) > +local function create_transport(host, port, user, password, callback, > + connection, greeting) > -- check / normalize credentials > if user == nil and password ~= nil then > box.error(E_PROC_LUA, 'net.box: user is not defined') > end > if user ~= nil and password == nil then password = '' end > > - -- state: current state, only the worker fiber and connect method > - -- change state > + -- Current state machine's state. > local state = 'initial' > local last_errno > local last_error > @@ -145,7 +175,6 @@ local function create_transport(host, port, user, password, callback) > local next_request_id = 1 > > local worker_fiber > - local connection > local send_buf = buffer.ibuf(buffer.READAHEAD) > local recv_buf = buffer.ibuf(buffer.READAHEAD) > > @@ -185,16 +214,19 @@ local function create_transport(host, port, user, password, callback) > return state == target_state or target_state[state] or false > end > > - -- CONNECT/CLOSE -- > + -- START/STOP -- > local protocol_sm > > - local function connect() > + local function start() > if state ~= 'initial' then return not is_final_state[state] end > - set_state('connecting') > + if not connection then > + set_state('error', E_NO_CONNECTION) > + return > + end > fiber.create(function() > worker_fiber = fiber_self() > fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true}) > - ::reconnect:: > + ::handle_connection:: > local ok, err = pcall(protocol_sm) > if not (ok or is_final_state[state]) then > set_state('error', E_UNKNOWN, err) > @@ -204,21 +236,34 @@ local function create_transport(host, port, user, password, callback) > connection = nil > end > local timeout = callback('reconnect_timeout') > - if timeout and state == 'error_reconnect' then > + -- Non-nil reconnect_timeout is an indicator of set > + -- reconnect_after option. > + while timeout and state == 'error_reconnect' do > fiber.sleep(timeout) > + -- During sleep, a connection could be garbage > + -- collected, or explicitly closed by a user, or > + -- the reconnect_after could be reset. Check the > + -- timeout again. > timeout = callback('reconnect_timeout') > - if timeout and state == 'error_reconnect' then > - goto reconnect > + if not timeout or state ~= 'error_reconnect' then > + break > + end > + connection, greeting = > + establish_connection(host, port, > + callback('fetch_connect_timeout')) > + if connection then > + goto handle_connection > end > + set_state('error_reconnect', E_NO_CONNECTION, greeting) > + timeout = callback('reconnect_timeout') > end > send_buf:recycle() > recv_buf:recycle() > worker_fiber = nil > end) > - return true > end > > - local function close() > + local function stop() > if not is_final_state[state] then > set_state('closed', E_NO_CONNECTION, 'Connection closed') > end > @@ -367,27 +412,14 @@ local function create_transport(host, port, user, password, callback) > -- collected. See gh-3164, where because of reconnect sleeps > -- in this function, a connection could not be deleted. > -- > - protocol_sm = function () > - local tm_begin, tm = fiber.clock(), callback('fetch_connect_timeout') > - connection = socket.tcp_connect(host, port, tm) > - if connection == nil then > - return error_sm(E_NO_CONNECTION, errno.strerror(errno())) > - end > - local size = IPROTO_GREETING_SIZE > - local err, msg = send_and_recv(size, tm - (fiber.clock() - tm_begin)) > - if err then > - return error_sm(err, msg) > - end > - local g = decode_greeting(ffi.string(recv_buf.rpos, size)) > - recv_buf.rpos = recv_buf.rpos + size > - if not g then > - return error_sm(E_NO_CONNECTION, 'Can\'t decode handshake') > - end > - err, msg = callback('handshake', g) > + protocol_sm = function() > + assert(connection) > + assert(greeting) > + local err, msg = callback('handshake', greeting) > if err then > return error_sm(err, msg) > end > - if g.protocol == 'Lua console' then > + if greeting.protocol == 'Lua console' then > local setup_delimiter = 'require("console").delimiter("$EOF$")\n' > method_codec.inject(send_buf, nil, nil, setup_delimiter) > local err, response = send_and_recv_console() > @@ -399,10 +431,11 @@ local function create_transport(host, port, user, password, callback) > local rid = next_request_id > set_state('active') > return console_sm(rid) > - elseif g.protocol == 'Binary' then > - return iproto_auth_sm(g.salt) > + elseif greeting.protocol == 'Binary' then > + return iproto_auth_sm(greeting.salt) > else > - return error_sm(E_NO_CONNECTION, 'Unknown protocol: ' .. g.protocol) > + return error_sm(E_NO_CONNECTION, > + 'Unknown protocol: '..greeting.protocol) > end > end > > @@ -524,14 +557,14 @@ local function create_transport(host, port, user, password, callback) > end > > return { > - close = close, > - connect = connect, > + stop = stop, > + start = start, > wait_state = wait_state, > perform_request = perform_request > } > end > > --- Wrap create_transport, adding auto-close-on-GC feature. > +-- Wrap create_transport, adding auto-stop-on-GC feature. > -- All the GC magic is neatly encapsulated! > -- The tricky part is the callback: > -- * callback (typically) references the transport (indirectly); > @@ -539,23 +572,24 @@ end > -- * fibers are GC roots - i.e. transport is never GC-ed! > -- We solve the issue by making the worker->callback ref weak. > -- Now it is necessary to have a strong ref to callback somewhere or > --- it is GC-ed prematurely. We wrap close() method, stashing the > --- ref in an upvalue (close() performance doesn't matter much.) > -local create_transport = function(host, port, user, password, callback) > +-- it is GC-ed prematurely. We wrap stop() method, stashing the > +-- ref in an upvalue (stop() performance doesn't matter much.) > +local create_transport = function(host, port, user, password, callback, > + connection, greeting) > local weak_refs = setmetatable({callback = callback}, {__mode = 'v'}) > local function weak_callback(...) > local callback = weak_refs.callback > if callback then return callback(...) end > end > - local transport = create_transport(host, port, user, > - password, weak_callback) > - local transport_close = transport.close > + local transport = create_transport(host, port, user, password, > + weak_callback, connection, greeting) > + local transport_stop = transport.stop > local gc_hook = ffi.gc(ffi.new('char[1]'), function() > - pcall(transport_close) > + pcall(transport_stop) > end) > - transport.close = function() > + transport.stop = function() > -- dummy gc_hook, callback refs prevent premature GC > - return transport_close(gc_hook, callback) > + return transport_stop(gc_hook, callback) > end > return transport > end > @@ -612,8 +646,7 @@ local console_mt = { > > local space_metatable, index_metatable > > -local function connect(...) > - local host, port, opts = parse_connect_params(...) > +local function new_sm(host, port, opts, connection, greeting) > local user, password = opts.user, opts.password; opts.password = nil > local last_reconnect_error > local remote = {host = host, port = port, opts = opts, state = 'initial'} > @@ -652,7 +685,7 @@ local function connect(...) > elseif what == 'will_fetch_schema' then > return not opts.console > elseif what == 'fetch_connect_timeout' then > - return opts.connect_timeout or 10 > + return opts.connect_timeout or DEFAULT_CONNECT_TIMEOUT > elseif what == 'did_fetch_schema' then > remote:_install_schema(...) > elseif what == 'reconnect_timeout' then > @@ -679,14 +712,56 @@ local function connect(...) > remote._on_schema_reload = trigger.new("on_schema_reload") > remote._on_disconnect = trigger.new("on_disconnect") > remote._on_connect = trigger.new("on_connect") > - remote._transport = create_transport(host, port, user, password, callback) > - remote._transport.connect() > + remote._transport = create_transport(host, port, user, password, callback, > + connection, greeting) > + remote._transport.start() > if opts.wait_connected ~= false then > remote._transport.wait_state('active', tonumber(opts.wait_connected)) > end > return remote > end > > +-- > +-- Wrap an existing connection into net.box API. > +-- @param connection Connected socket. > +-- @param greeting Decoded greeting, received from a server. > +-- @param host Hostname to which @a connection is established. > +-- @param port TCP port to which @a connection is established. > +-- @param opts Options like reconnect_after, connect_timeout, > +-- wait_connected, login, password, ... > +-- > +-- @retval Net.box object. > +-- > +local function wrap(connection, greeting, host, port, opts) > + if connection == nil or type(greeting) ~= 'table' then > + error('Usage: netbox.wrap(socket, greeting, [opts])') > + end > + opts = opts or {} > + return new_sm(host, port, opts, connection, greeting) > +end > + > +-- > +-- Connect to a remote server. > +-- @param uri OR host and port. URI is a string like > +-- hostname:port@login:password. Host and port can be > +-- passed separately with login and password in the next > +-- parameter. > +-- @param opts @Sa wrap(). > +-- > +-- @retval Net.box object. > +-- > +local function connect(...) > + local host, port, opts = parse_connect_params(...) > + local connection, greeting = > + establish_connection(host, port, opts.connect_timeout) > + if not connection then > + local dummy_conn = new_sm(host, port, opts) > + dummy_conn.error = greeting > + return dummy_conn > + end > + return new_sm(host, port, opts, connection, greeting) > +end > + > local function check_remote_arg(remote, method) > if type(remote) ~= 'table' then > local fmt = 'Use remote:%s(...) instead of remote.%s(...):' > @@ -710,7 +785,7 @@ end > > function remote_methods:close() > check_remote_arg(self, 'close') > - self._transport.close() > + self._transport.stop() > end > > function remote_methods:on_schema_reload(...) > @@ -1112,7 +1187,9 @@ end > local this_module = { > create_transport = create_transport, > connect = connect, > - new = connect -- Tarantool < 1.7.1 compatibility > + new = connect, -- Tarantool < 1.7.1 compatibility, > + wrap = wrap, > + establish_connection = establish_connection, > } > > function this_module.timeout(timeout, ...) > diff --git a/test/box/net.box.result b/test/box/net.box.result > index 46d85b327..c541cad60 100644 > --- a/test/box/net.box.result > +++ b/test/box/net.box.result > @@ -1754,7 +1754,7 @@ nb = net.new('localhost:3392', { > }); > --- > ... > -nb.error == "Timeout exceeded" or nb.error == "Connection timed out"; > +assert(nb.error == 'Operation timed out'); > --- > - true > ... > @@ -2292,12 +2292,97 @@ weak.c > --- > - null > ... > -box.schema.user.revoke('guest', 'execute', 'universe') > +-- > +-- gh-2677: netbox supports console connections, that complicates > +-- both console and netbox. It was necessary because before a > +-- connection is established, a console does not known is it > +-- binary or text protocol, and netbox could not be created from > +-- existing socket. > +-- > +box.schema.user.grant('guest','read,write,execute','universe') > +--- > +... > +urilib = require('uri') > +--- > +... > +uri = urilib.parse(tostring(box.cfg.listen)) > +--- > +... > +s, greeting = net.establish_connection(uri.host, uri.service) > +--- > +... > +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01}) > +--- > +... > +c.state > +--- > +- active > +... > +a = 100 > +--- > +... > +function kek(args) return {1, 2, 3, args} end > +--- > +... > +c:eval('a = 200') > +--- > +... > +a > +--- > +- 200 > +... > +c:call('kek', {300}) > +--- > +- [1, 2, 3, 300] > +... > +s = box.schema.create_space('test') > +--- > +... > +pk = s:create_index('pk') > +--- > +... > +c:reload_schema() > +--- > +... > +c.space.test:replace{1} > +--- > +- [1] > +... > +c.space.test:get{1} > +--- > +- [1] > +... > +c.space.test:delete{1} > +--- > +- [1] > +... > +-- > +-- Break a connection to test reconnect_after. > +-- > +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') > +--- > +... > +c.state > +--- > +- error_reconnect > +... > +while not c:is_connected() do fiber.sleep(0.01) end > +--- > +... > +c:ping() > +--- > +- true > +... > +s:drop() > --- > ... > c:close() > --- > ... > -c = nil > +c.state > +--- > +- closed > +... > +box.schema.user.revoke('guest', 'read,write,execute', 'universe') > --- > ... > diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua > index 87e26f84c..d828ac529 100644 > --- a/test/box/net.box.test.lua > +++ b/test/box/net.box.test.lua > @@ -701,7 +701,7 @@ nb = net.new('localhost:3392', { > wait_connected = true, console = true, > connect_timeout = 0.01 > }); > -nb.error == "Timeout exceeded" or nb.error == "Connection timed out"; > +assert(nb.error == 'Operation timed out'); > nb:close(); > -- we must get peer closed > nb = net.new('localhost:3392', { > @@ -937,6 +937,41 @@ collectgarbage('collect') > -- connection is deleted by 'collect'. > weak.c > > -box.schema.user.revoke('guest', 'execute', 'universe') > +-- > +-- gh-2677: netbox supports console connections, that complicates > +-- both console and netbox. It was necessary because before a > +-- connection is established, a console does not known is it > +-- binary or text protocol, and netbox could not be created from > +-- existing socket. > +-- > +box.schema.user.grant('guest','read,write,execute','universe') > +urilib = require('uri') > +uri = urilib.parse(tostring(box.cfg.listen)) > +s, greeting = net.establish_connection(uri.host, uri.service) > +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01}) > +c.state > + > +a = 100 > +function kek(args) return {1, 2, 3, args} end > +c:eval('a = 200') > +a > +c:call('kek', {300}) > +s = box.schema.create_space('test') > +pk = s:create_index('pk') > +c:reload_schema() > +c.space.test:replace{1} > +c.space.test:get{1} > +c.space.test:delete{1} > +-- > +-- Break a connection to test reconnect_after. > +-- > +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') > +c.state > +while not c:is_connected() do fiber.sleep(0.01) end > +c:ping() > + > +s:drop() > c:close() > -c = nil > +c.state > + > +box.schema.user.revoke('guest', 'read,write,execute', 'universe') > -- > 2.14.3 (Apple Git-98) > > -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] [PATCH v2 2/3] console: do not use netbox for console text connections 2018-03-28 13:35 [tarantool-patches] [PATCH v2 0/3] console: do not use netbox for console text connections Vladislav Shpilevoy 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket Vladislav Shpilevoy @ 2018-03-28 13:35 ` Vladislav Shpilevoy 2018-03-29 13:03 ` [tarantool-patches] " Konstantin Osipov 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 3/3] netbox: deprecate console support Vladislav Shpilevoy 2 siblings, 1 reply; 6+ messages in thread From: Vladislav Shpilevoy @ 2018-03-28 13:35 UTC (permalink / raw) To: tarantool-patches; +Cc: Vladislav Shpilevoy Netbox console support complicates both netbox and console. Lets use sockets directly for text protocol. Part of #2677 --- src/box/lua/console.lua | 162 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 131 insertions(+), 31 deletions(-) diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua index d49cf42be..b853d523c 100644 --- a/src/box/lua/console.lua +++ b/src/box/lua/console.lua @@ -8,6 +8,9 @@ local log = require('log') local errno = require('errno') local urilib = require('uri') local yaml = require('yaml') +local net_box = require('net.box') + +local YAML_TERM = '\n...\n' -- admin formatter must be able to encode any Lua variable local formatter = yaml.new() @@ -76,26 +79,117 @@ local function eval(line) return local_eval(nil, line) end +local text_connection_mt = { + __index = { + -- + -- Close underlying socket. + -- + close = function(self) + self._socket:close() + end, + -- + -- Write a text into a socket. + -- @param test Text to send. + -- @retval not nil Bytes sent. + -- @retval nil Error. + -- + write = function(self, text) + -- It is the hack to protect from SIGPIPE, which is + -- not ignored under debugger (gdb, lldb) on send in + -- a socket, that is actually closed. If a socket is + -- readable and read() returns nothing then the socket + -- is closed, and writing into it will raise SIGPIPE. + if self._socket:readable(0) then + local rc = self._socket:read({chunk = 1}) + if not rc or rc == '' then + return nil + else + assert(#rc == 1) + -- Make the char be unread. + self._socket.rbuf.wpos = self._socket.rbuf.wpos - 1 + end + end + return self._socket:write(text) + end, + -- + -- Read a text from a socket until YAML terminator. + -- @retval not nil Well formatted YAML. + -- @retval nil Error. + -- + read = function(self) + local ret = self._socket:read(YAML_TERM) + if ret and ret ~= '' then + return ret + end + end, + -- + -- Write + Read. + -- + eval = function(self, text) + text = text..'$EOF$\n' + if self:write(text) then + local rc = self:read() + if rc then + return rc + end + end + error(self:set_error()) + end, + -- + -- Make the connection be in error state, set error + -- message. + -- @retval Error message. + -- + set_error = function(self) + self.state = 'error' + self.error = self._socket:error() + if not self.error then + self.error = 'Peer closed' + end + return self.error + end, + } +} + +-- +-- Wrap an existing socket inside a netbox-like object. +-- @param connection Socket to wrap. +-- @param url Parsed destination URL. +-- @retval nil, err Error, and err contains an error message. +-- @retval not nil Netbox-like object. +-- +local function wrap_text_socket(connection, url) + local conn = setmetatable({ + _socket = connection, + state = 'active', + host = url.host or 'localhost', + port = url.service, + }, text_connection_mt) + if not conn:write('require("console").delimiter("$EOF$")\n') or + not conn:read() then + conn:set_error() + end + return conn +end + -- -- Evaluate command on remote instance -- local function remote_eval(self, line) - if not line or self.remote.state ~= 'active' then - local err = self.remote.error - self.remote:close() - self.remote = nil - -- restore local REPL mode - self.eval = nil - self.prompt = nil - self.completion = nil - pcall(self.on_client_disconnect, self) - return (err and format(false, err)) or '' + if line and self.remote.state == 'active' then + local ok, res = pcall(self.remote.eval, self.remote, line) + if self.remote.state == 'active' then + return ok and res or format(false, res) + end end - -- - -- execute line - -- - local ok, res = pcall(self.remote.eval, self.remote, line) - return ok and res or format(false, res) + local err = self.remote.error + self.remote:close() + self.remote = nil + self.eval = nil + self.prompt = nil + self.completion = nil + pcall(self.on_client_disconnect, self) + return (err and format(false, err)) or '' end -- @@ -285,12 +379,7 @@ end -- -- Connect to remove instance -- -local netbox_connect local function connect(uri, opts) - if not netbox_connect then -- workaround the broken loader - netbox_connect = require('net.box').connect - end - opts = opts or {} local self = fiber.self().storage.console @@ -306,18 +395,29 @@ local function connect(uri, opts) error('Usage: console.connect("[login:password@][host:]port")') end - -- connect to remote host + local connection, greeting = + net_box.establish_connection(u.host, u.service, opts.timeout) + if not connection then + log.verbose(greeting) + box.error(box.error.NO_CONNECTION) + end local remote - remote = netbox_connect(u.host, u.service, { - user = u.login, password = u.password, - console = true, connect_timeout = opts.timeout - }) - remote.host, remote.port = u.host or 'localhost', u.service - - -- run disconnect trigger if connection failed - if not remote:is_connected() then - pcall(self.on_client_disconnect, self) - error('Connection is not established: '..remote.error) + if greeting.protocol == 'Lua console' then + remote = wrap_text_socket(connection, u) + else + opts = { + connect_timeout = opts.timeout, + user = u.login, + password = u.password, + } + remote = net_box.wrap(connection, greeting, u.host, u.service, opts) + if not remote.host then + remote.host = 'localhost' + end + local old_eval = remote.eval + remote.eval = function(con, line) + return old_eval(con, 'return require("console").eval(...)', {line}) + end end -- check connection && permissions -- 2.14.3 (Apple Git-98) ^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] Re: [PATCH v2 2/3] console: do not use netbox for console text connections 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 2/3] console: do not use netbox for console text connections Vladislav Shpilevoy @ 2018-03-29 13:03 ` Konstantin Osipov 0 siblings, 0 replies; 6+ messages in thread From: Konstantin Osipov @ 2018-03-29 13:03 UTC (permalink / raw) To: tarantool-patches; +Cc: Vladislav Shpilevoy * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/03/28 16:50]: You mentioned you were going to get rid of the read() to detect sigpipe, and investigate sigpipe problem more thoroughly. Please do. > + write = function(self, text) > + -- It is the hack to protect from SIGPIPE, which is > + -- not ignored under debugger (gdb, lldb) on send in > + -- a socket, that is actually closed. If a socket is > + -- readable and read() returns nothing then the socket > + -- is closed, and writing into it will raise SIGPIPE. > + if self._socket:readable(0) then > + local rc = self._socket:read({chunk = 1}) > + if not rc or rc == '' then > + return nil > + else > + assert(#rc == 1) > + -- Make the char be unread. > + self._socket.rbuf.wpos = self._socket.rbuf.wpos - 1 > + end > + end > + return self._socket:write(text) -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] [PATCH v2 3/3] netbox: deprecate console support 2018-03-28 13:35 [tarantool-patches] [PATCH v2 0/3] console: do not use netbox for console text connections Vladislav Shpilevoy 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket Vladislav Shpilevoy 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 2/3] console: do not use netbox for console text connections Vladislav Shpilevoy @ 2018-03-28 13:35 ` Vladislav Shpilevoy 2 siblings, 0 replies; 6+ messages in thread From: Vladislav Shpilevoy @ 2018-03-28 13:35 UTC (permalink / raw) To: tarantool-patches; +Cc: Vladislav Shpilevoy Print warning about that. After a while the cosole support will be deleted from netbox. --- src/box/lua/net_box.lua | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 5691dcb9e..0309eef17 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -650,6 +650,11 @@ local function new_sm(host, port, opts, connection, greeting) local user, password = opts.user, opts.password; opts.password = nil local last_reconnect_error local remote = {host = host, port = port, opts = opts, state = 'initial'} + -- @deprecated since 1.10 + if greeting and greeting.protocol == 'Lua console' or opts.console then + log.warn("Netbox text protocol support is deprecated since 1.10, ".. + "please use require('console').connect() instead") + end local function callback(what, ...) if what == 'state_changed' then local state, errno, err = ... -- 2.14.3 (Apple Git-98) ^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2018-03-29 13:03 UTC | newest] Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2018-03-28 13:35 [tarantool-patches] [PATCH v2 0/3] console: do not use netbox for console text connections Vladislav Shpilevoy 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket Vladislav Shpilevoy 2018-03-29 13:03 ` [tarantool-patches] " Konstantin Osipov 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 2/3] console: do not use netbox for console text connections Vladislav Shpilevoy 2018-03-29 13:03 ` [tarantool-patches] " Konstantin Osipov 2018-03-28 13:35 ` [tarantool-patches] [PATCH v2 3/3] netbox: deprecate console support Vladislav Shpilevoy
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox