From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH 1/3] netbox: allow to create a netbox connection from existing socket Date: Thu, 22 Mar 2018 22:17:10 +0300 Message-Id: <10c500ef97902f645111fd2d46e756bf8dfdac1d.1521745741.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, Vladislav Shpilevoy List-ID: It is needed to create a binary console connection, when a socket is already created and a greeting is read and decoded. --- src/box/lua/net_box.lua | 85 ++++++++++++++++++++++++++++++++--------------- test/box/net.box.result | 78 +++++++++++++++++++++++++++++++++++++++++-- test/box/net.box.test.lua | 33 ++++++++++++++++-- 3 files changed, 166 insertions(+), 30 deletions(-) diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 87c8c548b..f3f69f52f 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -121,7 +121,8 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end -- Suggestion for callback writers: sleep a few secs before approving -- reconnect. -- -local function create_transport(host, port, user, password, callback) +local function create_transport(host, port, user, password, callback, + existing_connection, greeting) -- check / normalize credentials if user == nil and password ~= nil then box.error(E_PROC_LUA, 'net.box: user is not defined') @@ -368,26 +369,34 @@ local function create_transport(host, port, user, password, callback) -- in this function, a connection could not be deleted. -- protocol_sm = function () + local err, msg local tm_begin, tm = fiber.clock(), callback('fetch_connect_timeout') - connection = socket.tcp_connect(host, port, tm) - if connection == nil then - return error_sm(E_NO_CONNECTION, errno.strerror(errno())) - end - local size = IPROTO_GREETING_SIZE - local err, msg = send_and_recv(size, tm - (fiber.clock() - tm_begin)) - if err then - return error_sm(err, msg) - end - local g = decode_greeting(ffi.string(recv_buf.rpos, size)) - recv_buf.rpos = recv_buf.rpos + size - if not g then - return error_sm(E_NO_CONNECTION, 'Can\'t decode handshake') + if existing_connection then + connection = existing_connection + existing_connection = nil + assert(greeting) + assert(greeting.protocol ~= 'Lua console') + else + connection = socket.tcp_connect(host, port, tm) + if connection == nil then + return error_sm(E_NO_CONNECTION, errno.strerror(errno())) + end + local size = IPROTO_GREETING_SIZE + err, msg = send_and_recv(size, tm - (fiber.clock() - tm_begin)) + if err then + return error_sm(err, msg) + end + greeting = decode_greeting(ffi.string(recv_buf.rpos, size)) + recv_buf.rpos = recv_buf.rpos + size + if not greeting then + return error_sm(E_NO_CONNECTION, 'Can\'t decode handshake') + end end - err, msg = callback('handshake', g) + err, msg = callback('handshake', greeting) if err then return error_sm(err, msg) end - if g.protocol == 'Lua console' then + if greeting.protocol == 'Lua console' then local setup_delimiter = 'require("console").delimiter("$EOF$")\n' method_codec.inject(send_buf, nil, nil, setup_delimiter) local err, response = send_and_recv_console() @@ -399,10 +408,11 @@ local function create_transport(host, port, user, password, callback) local rid = next_request_id set_state('active') return console_sm(rid) - elseif g.protocol == 'Binary' then - return iproto_auth_sm(g.salt) + elseif greeting.protocol == 'Binary' then + return iproto_auth_sm(greeting.salt) else - return error_sm(E_NO_CONNECTION, 'Unknown protocol: ' .. g.protocol) + return error_sm(E_NO_CONNECTION, + 'Unknown protocol: '..greeting.protocol) end end @@ -541,14 +551,16 @@ end -- Now it is necessary to have a strong ref to callback somewhere or -- it is GC-ed prematurely. We wrap close() method, stashing the -- ref in an upvalue (close() performance doesn't matter much.) -local create_transport = function(host, port, user, password, callback) +local create_transport = function(host, port, user, password, callback, + existing_connection, greeting) local weak_refs = setmetatable({callback = callback}, {__mode = 'v'}) local function weak_callback(...) local callback = weak_refs.callback if callback then return callback(...) end end - local transport = create_transport(host, port, user, - password, weak_callback) + local transport = create_transport(host, port, user, password, + weak_callback, existing_connection, + greeting) local transport_close = transport.close local gc_hook = ffi.gc(ffi.new('char[1]'), function() pcall(transport_close) @@ -612,8 +624,7 @@ local console_mt = { local space_metatable, index_metatable -local function connect(...) - local host, port, opts = parse_connect_params(...) +local function do_connect(host, port, opts, existing_connection, greeting) local user, password = opts.user, opts.password; opts.password = nil local last_reconnect_error local remote = {host = host, port = port, opts = opts, state = 'initial'} @@ -679,7 +690,8 @@ local function connect(...) remote._on_schema_reload = trigger.new("on_schema_reload") remote._on_disconnect = trigger.new("on_disconnect") remote._on_connect = trigger.new("on_connect") - remote._transport = create_transport(host, port, user, password, callback) + remote._transport = create_transport(host, port, user, password, callback, + existing_connection, greeting) remote._transport.connect() if opts.wait_connected ~= false then remote._transport.wait_state('active', tonumber(opts.wait_connected)) @@ -687,6 +699,25 @@ local function connect(...) return remote end +local function connect(...) + local host, port, opts = parse_connect_params(...) + return do_connect(host, port, opts) +end + +local function wrap_socket(connection, greeting, url, opts) + if connection == nil or type(greeting) ~= 'table' then + error('Usage: netbox.wrap_socket(socket, greeting, [opts])') + end + if greeting.protocol == 'Lua console' then + error('Can not wrap console socket') + end + opts = opts or {} + if not opts.user and not opts.password then + opts.user, opts.password = url.login, url.password + end + return do_connect(url.host, url.service, opts, connection, greeting) +end + local function check_remote_arg(remote, method) if type(remote) ~= 'table' then local fmt = 'Use remote:%s(...) instead of remote.%s(...):' @@ -1112,7 +1143,9 @@ end local this_module = { create_transport = create_transport, connect = connect, - new = connect -- Tarantool < 1.7.1 compatibility + new = connect, -- Tarantool < 1.7.1 compatibility, + decode_greeting = internal.decode_greeting, + wrap_socket = wrap_socket, } function this_module.timeout(timeout, ...) diff --git a/test/box/net.box.result b/test/box/net.box.result index 46d85b327..cb9410085 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -2292,12 +2292,86 @@ weak.c --- - null ... -box.schema.user.revoke('guest', 'execute', 'universe') +-- +-- gh-2677: netbox supports console connections, that complicates +-- both console and netbox. It was necessary because before a +-- connection is established, a console does not known is it +-- binary or text protocol, and netbox could not be created from +-- existing socket. +-- +box.schema.user.grant('guest','read,write,execute','universe') +--- +... +urilib = require('uri') +--- +... +uri = urilib.parse(tostring(box.cfg.listen)) +--- +... +s = socket.tcp_connect(uri.host, uri.service) +--- +... +greeting = s:read({chunk = 128}) +--- +... +greeting = net.decode_greeting(greeting) +--- +... +c = net.wrap_socket(s, greeting, uri) +--- +... +c.state +--- +- active +... +a = 100 +--- +... +function kek(args) return {1, 2, 3, args} end +--- +... +c:eval('a = 200') +--- +... +a +--- +- 200 +... +c:call('kek', {300}) +--- +- [1, 2, 3, 300] +... +s = box.schema.create_space('test') +--- +... +pk = s:create_index('pk') +--- +... +c:reload_schema() +--- +... +c.space.test:replace{1} +--- +- [1] +... +c.space.test:get{1} +--- +- [1] +... +c.space.test:delete{1} +--- +- [1] +... +s:drop() --- ... c:close() --- ... -c = nil +c.state +--- +- closed +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') --- ... diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index 87e26f84c..487401514 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -937,6 +937,35 @@ collectgarbage('collect') -- connection is deleted by 'collect'. weak.c -box.schema.user.revoke('guest', 'execute', 'universe') +-- +-- gh-2677: netbox supports console connections, that complicates +-- both console and netbox. It was necessary because before a +-- connection is established, a console does not known is it +-- binary or text protocol, and netbox could not be created from +-- existing socket. +-- +box.schema.user.grant('guest','read,write,execute','universe') +urilib = require('uri') +uri = urilib.parse(tostring(box.cfg.listen)) +s = socket.tcp_connect(uri.host, uri.service) +greeting = s:read({chunk = 128}) +greeting = net.decode_greeting(greeting) +c = net.wrap_socket(s, greeting, uri) +c.state + +a = 100 +function kek(args) return {1, 2, 3, args} end +c:eval('a = 200') +a +c:call('kek', {300}) +s = box.schema.create_space('test') +pk = s:create_index('pk') +c:reload_schema() +c.space.test:replace{1} +c.space.test:get{1} +c.space.test:delete{1} +s:drop() c:close() -c = nil +c.state + +box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- 2.14.3 (Apple Git-98)