From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 82BE6287E5 for ; Wed, 28 Mar 2018 09:35:54 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 8Ts8Vi-Hd3tw for ; Wed, 28 Mar 2018 09:35:54 -0400 (EDT) Received: from smtp63.i.mail.ru (smtp63.i.mail.ru [217.69.128.43]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id D084227EAD for ; Wed, 28 Mar 2018 09:35:53 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket Date: Wed, 28 Mar 2018 16:35:48 +0300 Message-Id: <07d883f551feeadcbf50e6c9b4fe37c260f20ec0.1522243429.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Vladislav Shpilevoy It is needed to create a binary console connection, when a socket is already created and a greeting is read and decoded. --- src/box/lua/net_box.lua | 199 ++++++++++++++++++++++++++++++++-------------- test/box/net.box.result | 91 ++++++++++++++++++++- test/box/net.box.test.lua | 41 +++++++++- 3 files changed, 264 insertions(+), 67 deletions(-) diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 87c8c548b..5691dcb9e 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -31,6 +31,7 @@ local sequence_mt = { __serialize = 'sequence' } local TIMEOUT_INFINITY = 500 * 365 * 86400 local VSPACE_ID = 281 local VINDEX_ID = 289 +local DEFAULT_CONNECT_TIMEOUT = 10 local IPROTO_STATUS_KEY = 0x00 local IPROTO_ERRNO_MASK = 0x7FFF @@ -70,9 +71,37 @@ local method_codec = { local function next_id(id) return band(id + 1, 0x7FFFFFFF) end --- function create_transport(host, port, user, password, callback) -- --- Transport methods: connect(), close(), perfrom_request(), wait_state() +-- Connect to a remote server, do handshake. +-- @param host Hostname. +-- @param port TCP port. +-- @param timeout Timeout to connect and receive greeting. +-- +-- @retval nil, err Error occured. The reason is returned. +-- @retval two non-nils A connected socket and a decoded greeting. +-- +local function establish_connection(host, port, timeout) + local timeout = timeout or DEFAULT_CONNECT_TIMEOUT + local begin = fiber.clock() + local s = socket.tcp_connect(host, port, timeout) + if not s then + return nil, errno.strerror(errno()) + end + local msg = s:read({chunk = IPROTO_GREETING_SIZE}, + timeout - (fiber.clock() - begin)) + if not msg then + local err = s:error() + s:close() + return nil, err + end + local greeting, err = decode_greeting(msg) + if not greeting then + s:close() + return nil, err + end + return s, greeting +end + -- -- Basically, *transport* is a TCP connection speaking one of -- Tarantool network protocols. This is a low-level interface. @@ -85,16 +114,17 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end -- -- Transport state machine: -- --- State machine starts in 'initial' state. Connect method changes --- the state to 'connecting' and spawns a worker fiber. Close --- method sets the state to 'closed' and kills the worker. --- If the transport is already in 'error' state close() does nothing. +-- State machine starts in 'initial' state. New_sm method +-- accepts an established connection and spawns a worker fiber. +-- Stop method sets the state to 'closed' and kills the worker. +-- If the transport is already in 'error' state stop() does +-- nothing. -- -- State chart: -- --- initial -> connecting -> active +-- connecting -> initial +-> active -- \ --- -> auth -> fetch_schema <-> active +-- +-> auth -> fetch_schema <-> active -- -- (any state, on error) -> error_reconnect -> connecting -> ... -- \ @@ -121,15 +151,15 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end -- Suggestion for callback writers: sleep a few secs before approving -- reconnect. -- -local function create_transport(host, port, user, password, callback) +local function create_transport(host, port, user, password, callback, + connection, greeting) -- check / normalize credentials if user == nil and password ~= nil then box.error(E_PROC_LUA, 'net.box: user is not defined') end if user ~= nil and password == nil then password = '' end - -- state: current state, only the worker fiber and connect method - -- change state + -- Current state machine's state. local state = 'initial' local last_errno local last_error @@ -145,7 +175,6 @@ local function create_transport(host, port, user, password, callback) local next_request_id = 1 local worker_fiber - local connection local send_buf = buffer.ibuf(buffer.READAHEAD) local recv_buf = buffer.ibuf(buffer.READAHEAD) @@ -185,16 +214,19 @@ local function create_transport(host, port, user, password, callback) return state == target_state or target_state[state] or false end - -- CONNECT/CLOSE -- + -- START/STOP -- local protocol_sm - local function connect() + local function start() if state ~= 'initial' then return not is_final_state[state] end - set_state('connecting') + if not connection then + set_state('error', E_NO_CONNECTION) + return + end fiber.create(function() worker_fiber = fiber_self() fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true}) - ::reconnect:: + ::handle_connection:: local ok, err = pcall(protocol_sm) if not (ok or is_final_state[state]) then set_state('error', E_UNKNOWN, err) @@ -204,21 +236,34 @@ local function create_transport(host, port, user, password, callback) connection = nil end local timeout = callback('reconnect_timeout') - if timeout and state == 'error_reconnect' then + -- Non-nil reconnect_timeout is an indicator of set + -- reconnect_after option. + while timeout and state == 'error_reconnect' do fiber.sleep(timeout) + -- During sleep, a connection could be garbage + -- collected, or explicitly closed by a user, or + -- the reconnect_after could be reset. Check the + -- timeout again. timeout = callback('reconnect_timeout') - if timeout and state == 'error_reconnect' then - goto reconnect + if not timeout or state ~= 'error_reconnect' then + break + end + connection, greeting = + establish_connection(host, port, + callback('fetch_connect_timeout')) + if connection then + goto handle_connection end + set_state('error_reconnect', E_NO_CONNECTION, greeting) + timeout = callback('reconnect_timeout') end send_buf:recycle() recv_buf:recycle() worker_fiber = nil end) - return true end - local function close() + local function stop() if not is_final_state[state] then set_state('closed', E_NO_CONNECTION, 'Connection closed') end @@ -367,27 +412,14 @@ local function create_transport(host, port, user, password, callback) -- collected. See gh-3164, where because of reconnect sleeps -- in this function, a connection could not be deleted. -- - protocol_sm = function () - local tm_begin, tm = fiber.clock(), callback('fetch_connect_timeout') - connection = socket.tcp_connect(host, port, tm) - if connection == nil then - return error_sm(E_NO_CONNECTION, errno.strerror(errno())) - end - local size = IPROTO_GREETING_SIZE - local err, msg = send_and_recv(size, tm - (fiber.clock() - tm_begin)) - if err then - return error_sm(err, msg) - end - local g = decode_greeting(ffi.string(recv_buf.rpos, size)) - recv_buf.rpos = recv_buf.rpos + size - if not g then - return error_sm(E_NO_CONNECTION, 'Can\'t decode handshake') - end - err, msg = callback('handshake', g) + protocol_sm = function() + assert(connection) + assert(greeting) + local err, msg = callback('handshake', greeting) if err then return error_sm(err, msg) end - if g.protocol == 'Lua console' then + if greeting.protocol == 'Lua console' then local setup_delimiter = 'require("console").delimiter("$EOF$")\n' method_codec.inject(send_buf, nil, nil, setup_delimiter) local err, response = send_and_recv_console() @@ -399,10 +431,11 @@ local function create_transport(host, port, user, password, callback) local rid = next_request_id set_state('active') return console_sm(rid) - elseif g.protocol == 'Binary' then - return iproto_auth_sm(g.salt) + elseif greeting.protocol == 'Binary' then + return iproto_auth_sm(greeting.salt) else - return error_sm(E_NO_CONNECTION, 'Unknown protocol: ' .. g.protocol) + return error_sm(E_NO_CONNECTION, + 'Unknown protocol: '..greeting.protocol) end end @@ -524,14 +557,14 @@ local function create_transport(host, port, user, password, callback) end return { - close = close, - connect = connect, + stop = stop, + start = start, wait_state = wait_state, perform_request = perform_request } end --- Wrap create_transport, adding auto-close-on-GC feature. +-- Wrap create_transport, adding auto-stop-on-GC feature. -- All the GC magic is neatly encapsulated! -- The tricky part is the callback: -- * callback (typically) references the transport (indirectly); @@ -539,23 +572,24 @@ end -- * fibers are GC roots - i.e. transport is never GC-ed! -- We solve the issue by making the worker->callback ref weak. -- Now it is necessary to have a strong ref to callback somewhere or --- it is GC-ed prematurely. We wrap close() method, stashing the --- ref in an upvalue (close() performance doesn't matter much.) -local create_transport = function(host, port, user, password, callback) +-- it is GC-ed prematurely. We wrap stop() method, stashing the +-- ref in an upvalue (stop() performance doesn't matter much.) +local create_transport = function(host, port, user, password, callback, + connection, greeting) local weak_refs = setmetatable({callback = callback}, {__mode = 'v'}) local function weak_callback(...) local callback = weak_refs.callback if callback then return callback(...) end end - local transport = create_transport(host, port, user, - password, weak_callback) - local transport_close = transport.close + local transport = create_transport(host, port, user, password, + weak_callback, connection, greeting) + local transport_stop = transport.stop local gc_hook = ffi.gc(ffi.new('char[1]'), function() - pcall(transport_close) + pcall(transport_stop) end) - transport.close = function() + transport.stop = function() -- dummy gc_hook, callback refs prevent premature GC - return transport_close(gc_hook, callback) + return transport_stop(gc_hook, callback) end return transport end @@ -612,8 +646,7 @@ local console_mt = { local space_metatable, index_metatable -local function connect(...) - local host, port, opts = parse_connect_params(...) +local function new_sm(host, port, opts, connection, greeting) local user, password = opts.user, opts.password; opts.password = nil local last_reconnect_error local remote = {host = host, port = port, opts = opts, state = 'initial'} @@ -652,7 +685,7 @@ local function connect(...) elseif what == 'will_fetch_schema' then return not opts.console elseif what == 'fetch_connect_timeout' then - return opts.connect_timeout or 10 + return opts.connect_timeout or DEFAULT_CONNECT_TIMEOUT elseif what == 'did_fetch_schema' then remote:_install_schema(...) elseif what == 'reconnect_timeout' then @@ -679,14 +712,56 @@ local function connect(...) remote._on_schema_reload = trigger.new("on_schema_reload") remote._on_disconnect = trigger.new("on_disconnect") remote._on_connect = trigger.new("on_connect") - remote._transport = create_transport(host, port, user, password, callback) - remote._transport.connect() + remote._transport = create_transport(host, port, user, password, callback, + connection, greeting) + remote._transport.start() if opts.wait_connected ~= false then remote._transport.wait_state('active', tonumber(opts.wait_connected)) end return remote end +-- +-- Wrap an existing connection into net.box API. +-- @param connection Connected socket. +-- @param greeting Decoded greeting, received from a server. +-- @param host Hostname to which @a connection is established. +-- @param port TCP port to which @a connection is established. +-- @param opts Options like reconnect_after, connect_timeout, +-- wait_connected, login, password, ... +-- +-- @retval Net.box object. +-- +local function wrap(connection, greeting, host, port, opts) + if connection == nil or type(greeting) ~= 'table' then + error('Usage: netbox.wrap(socket, greeting, [opts])') + end + opts = opts or {} + return new_sm(host, port, opts, connection, greeting) +end + +-- +-- Connect to a remote server. +-- @param uri OR host and port. URI is a string like +-- hostname:port@login:password. Host and port can be +-- passed separately with login and password in the next +-- parameter. +-- @param opts @Sa wrap(). +-- +-- @retval Net.box object. +-- +local function connect(...) + local host, port, opts = parse_connect_params(...) + local connection, greeting = + establish_connection(host, port, opts.connect_timeout) + if not connection then + local dummy_conn = new_sm(host, port, opts) + dummy_conn.error = greeting + return dummy_conn + end + return new_sm(host, port, opts, connection, greeting) +end + local function check_remote_arg(remote, method) if type(remote) ~= 'table' then local fmt = 'Use remote:%s(...) instead of remote.%s(...):' @@ -710,7 +785,7 @@ end function remote_methods:close() check_remote_arg(self, 'close') - self._transport.close() + self._transport.stop() end function remote_methods:on_schema_reload(...) @@ -1112,7 +1187,9 @@ end local this_module = { create_transport = create_transport, connect = connect, - new = connect -- Tarantool < 1.7.1 compatibility + new = connect, -- Tarantool < 1.7.1 compatibility, + wrap = wrap, + establish_connection = establish_connection, } function this_module.timeout(timeout, ...) diff --git a/test/box/net.box.result b/test/box/net.box.result index 46d85b327..c541cad60 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -1754,7 +1754,7 @@ nb = net.new('localhost:3392', { }); --- ... -nb.error == "Timeout exceeded" or nb.error == "Connection timed out"; +assert(nb.error == 'Operation timed out'); --- - true ... @@ -2292,12 +2292,97 @@ weak.c --- - null ... -box.schema.user.revoke('guest', 'execute', 'universe') +-- +-- gh-2677: netbox supports console connections, that complicates +-- both console and netbox. It was necessary because before a +-- connection is established, a console does not known is it +-- binary or text protocol, and netbox could not be created from +-- existing socket. +-- +box.schema.user.grant('guest','read,write,execute','universe') +--- +... +urilib = require('uri') +--- +... +uri = urilib.parse(tostring(box.cfg.listen)) +--- +... +s, greeting = net.establish_connection(uri.host, uri.service) +--- +... +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01}) +--- +... +c.state +--- +- active +... +a = 100 +--- +... +function kek(args) return {1, 2, 3, args} end +--- +... +c:eval('a = 200') +--- +... +a +--- +- 200 +... +c:call('kek', {300}) +--- +- [1, 2, 3, 300] +... +s = box.schema.create_space('test') +--- +... +pk = s:create_index('pk') +--- +... +c:reload_schema() +--- +... +c.space.test:replace{1} +--- +- [1] +... +c.space.test:get{1} +--- +- [1] +... +c.space.test:delete{1} +--- +- [1] +... +-- +-- Break a connection to test reconnect_after. +-- +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +--- +... +c.state +--- +- error_reconnect +... +while not c:is_connected() do fiber.sleep(0.01) end +--- +... +c:ping() +--- +- true +... +s:drop() --- ... c:close() --- ... -c = nil +c.state +--- +- closed +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') --- ... diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index 87e26f84c..d828ac529 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -701,7 +701,7 @@ nb = net.new('localhost:3392', { wait_connected = true, console = true, connect_timeout = 0.01 }); -nb.error == "Timeout exceeded" or nb.error == "Connection timed out"; +assert(nb.error == 'Operation timed out'); nb:close(); -- we must get peer closed nb = net.new('localhost:3392', { @@ -937,6 +937,41 @@ collectgarbage('collect') -- connection is deleted by 'collect'. weak.c -box.schema.user.revoke('guest', 'execute', 'universe') +-- +-- gh-2677: netbox supports console connections, that complicates +-- both console and netbox. It was necessary because before a +-- connection is established, a console does not known is it +-- binary or text protocol, and netbox could not be created from +-- existing socket. +-- +box.schema.user.grant('guest','read,write,execute','universe') +urilib = require('uri') +uri = urilib.parse(tostring(box.cfg.listen)) +s, greeting = net.establish_connection(uri.host, uri.service) +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01}) +c.state + +a = 100 +function kek(args) return {1, 2, 3, args} end +c:eval('a = 200') +a +c:call('kek', {300}) +s = box.schema.create_space('test') +pk = s:create_index('pk') +c:reload_schema() +c.space.test:replace{1} +c.space.test:get{1} +c.space.test:delete{1} +-- +-- Break a connection to test reconnect_after. +-- +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +c.state +while not c:is_connected() do fiber.sleep(0.01) end +c:ping() + +s:drop() c:close() -c = nil +c.state + +box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- 2.14.3 (Apple Git-98)