[tarantool-patches] Re: [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket

Konstantin Osipov kostja at tarantool.org
Thu Mar 29 16:03:21 MSK 2018


* Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/03/28 16:50]:
> ---

This patch looks good to me, I will push it now.

>  src/box/lua/net_box.lua   | 199 ++++++++++++++++++++++++++++++++--------------
>  test/box/net.box.result   |  91 ++++++++++++++++++++-
>  test/box/net.box.test.lua |  41 +++++++++-
>  3 files changed, 264 insertions(+), 67 deletions(-)
> 
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 87c8c548b..5691dcb9e 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -31,6 +31,7 @@ local sequence_mt      = { __serialize = 'sequence' }
>  local TIMEOUT_INFINITY = 500 * 365 * 86400
>  local VSPACE_ID        = 281
>  local VINDEX_ID        = 289
> +local DEFAULT_CONNECT_TIMEOUT = 10
>  
>  local IPROTO_STATUS_KEY    = 0x00
>  local IPROTO_ERRNO_MASK    = 0x7FFF
> @@ -70,9 +71,37 @@ local method_codec           = {
>  
>  local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
>  
> --- function create_transport(host, port, user, password, callback)
>  --
> --- Transport methods: connect(), close(), perfrom_request(), wait_state()
> +-- Connect to a remote server, do handshake.
> +-- @param host Hostname.
> +-- @param port TCP port.
> +-- @param timeout Timeout to connect and receive greeting.
> +--
> +-- @retval nil, err Error occured. The reason is returned.
> +-- @retval two non-nils A connected socket and a decoded greeting.
> +--
> +local function establish_connection(host, port, timeout)
> +    local timeout = timeout or DEFAULT_CONNECT_TIMEOUT
> +    local begin = fiber.clock()
> +    local s = socket.tcp_connect(host, port, timeout)
> +    if not s then
> +        return nil, errno.strerror(errno())
> +    end
> +    local msg = s:read({chunk = IPROTO_GREETING_SIZE},
> +                        timeout - (fiber.clock() - begin))
> +    if not msg then
> +        local err = s:error()
> +        s:close()
> +        return nil, err
> +    end
> +    local greeting, err = decode_greeting(msg)
> +    if not greeting then
> +        s:close()
> +        return nil, err
> +    end
> +    return s, greeting
> +end
> +
>  --
>  -- Basically, *transport* is a TCP connection speaking one of
>  -- Tarantool network protocols. This is a low-level interface.
> @@ -85,16 +114,17 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
>  --
>  -- Transport state machine:
>  --
> --- State machine starts in 'initial' state. Connect method changes
> --- the state to 'connecting' and spawns a worker fiber. Close
> --- method sets the state to 'closed' and kills the worker.
> --- If the transport is already in 'error' state close() does nothing.
> +-- State machine starts in 'initial' state. New_sm method
> +-- accepts an established connection and spawns a worker fiber.
> +-- Stop method sets the state to 'closed' and kills the worker.
> +-- If the transport is already in 'error' state stop() does
> +-- nothing.
>  --
>  -- State chart:
>  --
> ---  initial -> connecting -> active
> +-- connecting -> initial +-> active
>  --                        \
> ---                          -> auth -> fetch_schema <-> active
> +--                         +-> auth -> fetch_schema <-> active
>  --
>  --  (any state, on error) -> error_reconnect -> connecting -> ...
>  --                                           \
> @@ -121,15 +151,15 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
>  -- Suggestion for callback writers: sleep a few secs before approving
>  -- reconnect.
>  --
> -local function create_transport(host, port, user, password, callback)
> +local function create_transport(host, port, user, password, callback,
> +                                connection, greeting)
>      -- check / normalize credentials
>      if user == nil and password ~= nil then
>          box.error(E_PROC_LUA, 'net.box: user is not defined')
>      end
>      if user ~= nil and password == nil then password = '' end
>  
> -    -- state: current state, only the worker fiber and connect method
> -    -- change state
> +    -- Current state machine's state.
>      local state            = 'initial'
>      local last_errno
>      local last_error
> @@ -145,7 +175,6 @@ local function create_transport(host, port, user, password, callback)
>      local next_request_id  = 1
>  
>      local worker_fiber
> -    local connection
>      local send_buf         = buffer.ibuf(buffer.READAHEAD)
>      local recv_buf         = buffer.ibuf(buffer.READAHEAD)
>  
> @@ -185,16 +214,19 @@ local function create_transport(host, port, user, password, callback)
>          return state == target_state or target_state[state] or false
>      end
>  
> -    -- CONNECT/CLOSE --
> +    -- START/STOP --
>      local protocol_sm
>  
> -    local function connect()
> +    local function start()
>          if state ~= 'initial' then return not is_final_state[state] end
> -        set_state('connecting')
> +        if not connection then
> +            set_state('error', E_NO_CONNECTION)
> +            return
> +        end
>          fiber.create(function()
>              worker_fiber = fiber_self()
>              fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true})
> -    ::reconnect::
> +    ::handle_connection::
>              local ok, err = pcall(protocol_sm)
>              if not (ok or is_final_state[state]) then
>                  set_state('error', E_UNKNOWN, err)
> @@ -204,21 +236,34 @@ local function create_transport(host, port, user, password, callback)
>                  connection = nil
>              end
>              local timeout = callback('reconnect_timeout')
> -            if timeout and state == 'error_reconnect' then
> +            -- Non-nil reconnect_timeout is an indicator of set
> +            -- reconnect_after option.
> +            while timeout and state == 'error_reconnect' do
>                  fiber.sleep(timeout)
> +                -- During sleep, a connection could be garbage
> +                -- collected, or explicitly closed by a user, or
> +                -- the reconnect_after could be reset. Check the
> +                -- timeout again.
>                  timeout = callback('reconnect_timeout')
> -                if timeout and state == 'error_reconnect' then
> -                    goto reconnect
> +                if not timeout or state ~= 'error_reconnect' then
> +                    break
> +                end
> +                connection, greeting =
> +                    establish_connection(host, port,
> +                                         callback('fetch_connect_timeout'))
> +                if connection then
> +                    goto handle_connection
>                  end
> +                set_state('error_reconnect', E_NO_CONNECTION, greeting)
> +                timeout = callback('reconnect_timeout')
>              end
>              send_buf:recycle()
>              recv_buf:recycle()
>              worker_fiber = nil
>          end)
> -        return true
>      end
>  
> -    local function close()
> +    local function stop()
>          if not is_final_state[state] then
>              set_state('closed', E_NO_CONNECTION, 'Connection closed')
>          end
> @@ -367,27 +412,14 @@ local function create_transport(host, port, user, password, callback)
>      -- collected. See gh-3164, where because of reconnect sleeps
>      -- in this function, a connection could not be deleted.
>      --
> -    protocol_sm = function ()
> -        local tm_begin, tm = fiber.clock(), callback('fetch_connect_timeout')
> -        connection = socket.tcp_connect(host, port, tm)
> -        if connection == nil then
> -            return error_sm(E_NO_CONNECTION, errno.strerror(errno()))
> -        end
> -        local size = IPROTO_GREETING_SIZE
> -        local err, msg = send_and_recv(size, tm - (fiber.clock() - tm_begin))
> -        if err then
> -            return error_sm(err, msg)
> -        end
> -        local g = decode_greeting(ffi.string(recv_buf.rpos, size))
> -        recv_buf.rpos = recv_buf.rpos + size
> -        if not g then
> -            return error_sm(E_NO_CONNECTION, 'Can\'t decode handshake')
> -        end
> -        err, msg = callback('handshake', g)
> +    protocol_sm = function()
> +        assert(connection)
> +        assert(greeting)
> +        local err, msg = callback('handshake', greeting)
>          if err then
>              return error_sm(err, msg)
>          end
> -        if g.protocol == 'Lua console' then
> +        if greeting.protocol == 'Lua console' then
>              local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
>              method_codec.inject(send_buf, nil, nil, setup_delimiter)
>              local err, response = send_and_recv_console()
> @@ -399,10 +431,11 @@ local function create_transport(host, port, user, password, callback)
>              local rid = next_request_id
>              set_state('active')
>              return console_sm(rid)
> -        elseif g.protocol == 'Binary' then
> -            return iproto_auth_sm(g.salt)
> +        elseif greeting.protocol == 'Binary' then
> +            return iproto_auth_sm(greeting.salt)
>          else
> -            return error_sm(E_NO_CONNECTION, 'Unknown protocol: ' .. g.protocol)
> +            return error_sm(E_NO_CONNECTION,
> +                            'Unknown protocol: '..greeting.protocol)
>          end
>      end
>  
> @@ -524,14 +557,14 @@ local function create_transport(host, port, user, password, callback)
>      end
>  
>      return {
> -        close           = close,
> -        connect         = connect,
> +        stop            = stop,
> +        start           = start,
>          wait_state      = wait_state,
>          perform_request = perform_request
>      }
>  end
>  
> --- Wrap create_transport, adding auto-close-on-GC feature.
> +-- Wrap create_transport, adding auto-stop-on-GC feature.
>  -- All the GC magic is neatly encapsulated!
>  -- The tricky part is the callback:
>  --  * callback (typically) references the transport (indirectly);
> @@ -539,23 +572,24 @@ end
>  --  * fibers are GC roots - i.e. transport is never GC-ed!
>  -- We solve the issue by making the worker->callback ref weak.
>  -- Now it is necessary to have a strong ref to callback somewhere or
> --- it is GC-ed prematurely. We wrap close() method, stashing the
> --- ref in an upvalue (close() performance doesn't matter much.)
> -local create_transport = function(host, port, user, password, callback)
> +-- it is GC-ed prematurely. We wrap stop() method, stashing the
> +-- ref in an upvalue (stop() performance doesn't matter much.)
> +local create_transport = function(host, port, user, password, callback,
> +                                  connection, greeting)
>      local weak_refs = setmetatable({callback = callback}, {__mode = 'v'})
>      local function weak_callback(...)
>          local callback = weak_refs.callback
>          if callback then return callback(...) end
>      end
> -    local transport = create_transport(host, port, user,
> -                                       password, weak_callback)
> -    local transport_close = transport.close
> +    local transport = create_transport(host, port, user, password,
> +                                       weak_callback, connection, greeting)
> +    local transport_stop = transport.stop
>      local gc_hook = ffi.gc(ffi.new('char[1]'), function()
> -        pcall(transport_close)
> +        pcall(transport_stop)
>      end)
> -    transport.close = function()
> +    transport.stop = function()
>          -- dummy gc_hook, callback refs prevent premature GC
> -        return transport_close(gc_hook, callback)
> +        return transport_stop(gc_hook, callback)
>      end
>      return transport
>  end
> @@ -612,8 +646,7 @@ local console_mt = {
>  
>  local space_metatable, index_metatable
>  
> -local function connect(...)
> -    local host, port, opts = parse_connect_params(...)
> +local function new_sm(host, port, opts, connection, greeting)
>      local user, password = opts.user, opts.password; opts.password = nil
>      local last_reconnect_error
>      local remote = {host = host, port = port, opts = opts, state = 'initial'}
> @@ -652,7 +685,7 @@ local function connect(...)
>          elseif what == 'will_fetch_schema' then
>              return not opts.console
>          elseif what == 'fetch_connect_timeout' then
> -            return opts.connect_timeout or 10
> +            return opts.connect_timeout or DEFAULT_CONNECT_TIMEOUT
>          elseif what == 'did_fetch_schema' then
>              remote:_install_schema(...)
>          elseif what == 'reconnect_timeout' then
> @@ -679,14 +712,56 @@ local function connect(...)
>      remote._on_schema_reload = trigger.new("on_schema_reload")
>      remote._on_disconnect = trigger.new("on_disconnect")
>      remote._on_connect = trigger.new("on_connect")
> -    remote._transport = create_transport(host, port, user, password, callback)
> -    remote._transport.connect()
> +    remote._transport = create_transport(host, port, user, password, callback,
> +                                         connection, greeting)
> +    remote._transport.start()
>      if opts.wait_connected ~= false then
>          remote._transport.wait_state('active', tonumber(opts.wait_connected))
>      end
>      return remote
>  end
>  
> +--
> +-- Wrap an existing connection into net.box API.
> +-- @param connection Connected socket.
> +-- @param greeting Decoded greeting, received from a server.
> +-- @param host Hostname to which @a connection is established.
> +-- @param port TCP port to which @a connection is established.
> +-- @param opts Options like reconnect_after, connect_timeout,
> +--        wait_connected, login, password, ...
> +--
> +-- @retval Net.box object.
> +--
> +local function wrap(connection, greeting, host, port, opts)
> +    if connection == nil or type(greeting) ~= 'table' then
> +        error('Usage: netbox.wrap(socket, greeting, [opts])')
> +    end
> +    opts = opts or {}
> +    return new_sm(host, port, opts, connection, greeting)
> +end
> +
> +--
> +-- Connect to a remote server.
> +-- @param uri OR host and port. URI is a string like
> +--        hostname:port at login:password. Host and port can be
> +--        passed separately with login and password in the next
> +--        parameter.
> +-- @param opts @Sa wrap().
> +--
> +-- @retval Net.box object.
> +--
> +local function connect(...)
> +    local host, port, opts = parse_connect_params(...)
> +    local connection, greeting =
> +        establish_connection(host, port, opts.connect_timeout)
> +    if not connection then
> +        local dummy_conn = new_sm(host, port, opts)
> +        dummy_conn.error = greeting
> +        return dummy_conn
> +    end
> +    return new_sm(host, port, opts, connection, greeting)
> +end
> +
>  local function check_remote_arg(remote, method)
>      if type(remote) ~= 'table' then
>          local fmt = 'Use remote:%s(...) instead of remote.%s(...):'
> @@ -710,7 +785,7 @@ end
>  
>  function remote_methods:close()
>      check_remote_arg(self, 'close')
> -    self._transport.close()
> +    self._transport.stop()
>  end
>  
>  function remote_methods:on_schema_reload(...)
> @@ -1112,7 +1187,9 @@ end
>  local this_module = {
>      create_transport = create_transport,
>      connect = connect,
> -    new = connect -- Tarantool < 1.7.1 compatibility
> +    new = connect, -- Tarantool < 1.7.1 compatibility,
> +    wrap = wrap,
> +    establish_connection = establish_connection,
>  }
>  
>  function this_module.timeout(timeout, ...)
> diff --git a/test/box/net.box.result b/test/box/net.box.result
> index 46d85b327..c541cad60 100644
> --- a/test/box/net.box.result
> +++ b/test/box/net.box.result
> @@ -1754,7 +1754,7 @@ nb = net.new('localhost:3392', {
>  });
>  ---
>  ...
> -nb.error == "Timeout exceeded" or nb.error == "Connection timed out";
> +assert(nb.error == 'Operation timed out');
>  ---
>  - true
>  ...
> @@ -2292,12 +2292,97 @@ weak.c
>  ---
>  - null
>  ...
> -box.schema.user.revoke('guest', 'execute', 'universe')
> +--
> +-- gh-2677: netbox supports console connections, that complicates
> +-- both console and netbox. It was necessary because before a
> +-- connection is established, a console does not known is it
> +-- binary or text protocol, and netbox could not be created from
> +-- existing socket.
> +--
> +box.schema.user.grant('guest','read,write,execute','universe')
> +---
> +...
> +urilib = require('uri')
> +---
> +...
> +uri = urilib.parse(tostring(box.cfg.listen))
> +---
> +...
> +s, greeting = net.establish_connection(uri.host, uri.service)
> +---
> +...
> +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01})
> +---
> +...
> +c.state
> +---
> +- active
> +...
> +a = 100
> +---
> +...
> +function kek(args) return {1, 2, 3, args} end
> +---
> +...
> +c:eval('a = 200')
> +---
> +...
> +a
> +---
> +- 200
> +...
> +c:call('kek', {300})
> +---
> +- [1, 2, 3, 300]
> +...
> +s = box.schema.create_space('test')
> +---
> +...
> +pk = s:create_index('pk')
> +---
> +...
> +c:reload_schema()
> +---
> +...
> +c.space.test:replace{1}
> +---
> +- [1]
> +...
> +c.space.test:get{1}
> +---
> +- [1]
> +...
> +c.space.test:delete{1}
> +---
> +- [1]
> +...
> +--
> +-- Break a connection to test reconnect_after.
> +--
> +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
> +---
> +...
> +c.state
> +---
> +- error_reconnect
> +...
> +while not c:is_connected() do fiber.sleep(0.01) end
> +---
> +...
> +c:ping()
> +---
> +- true
> +...
> +s:drop()
>  ---
>  ...
>  c:close()
>  ---
>  ...
> -c = nil
> +c.state
> +---
> +- closed
> +...
> +box.schema.user.revoke('guest', 'read,write,execute', 'universe')
>  ---
>  ...
> diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
> index 87e26f84c..d828ac529 100644
> --- a/test/box/net.box.test.lua
> +++ b/test/box/net.box.test.lua
> @@ -701,7 +701,7 @@ nb = net.new('localhost:3392', {
>      wait_connected = true, console = true,
>      connect_timeout = 0.01
>  });
> -nb.error == "Timeout exceeded" or nb.error == "Connection timed out";
> +assert(nb.error == 'Operation timed out');
>  nb:close();
>  -- we must get peer closed
>  nb = net.new('localhost:3392', {
> @@ -937,6 +937,41 @@ collectgarbage('collect')
>  -- connection is deleted by 'collect'.
>  weak.c
>  
> -box.schema.user.revoke('guest', 'execute', 'universe')
> +--
> +-- gh-2677: netbox supports console connections, that complicates
> +-- both console and netbox. It was necessary because before a
> +-- connection is established, a console does not known is it
> +-- binary or text protocol, and netbox could not be created from
> +-- existing socket.
> +--
> +box.schema.user.grant('guest','read,write,execute','universe')
> +urilib = require('uri')
> +uri = urilib.parse(tostring(box.cfg.listen))
> +s, greeting = net.establish_connection(uri.host, uri.service)
> +c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01})
> +c.state
> +
> +a = 100
> +function kek(args) return {1, 2, 3, args} end
> +c:eval('a = 200')
> +a
> +c:call('kek', {300})
> +s = box.schema.create_space('test')
> +pk = s:create_index('pk')
> +c:reload_schema()
> +c.space.test:replace{1}
> +c.space.test:get{1}
> +c.space.test:delete{1}
> +--
> +-- Break a connection to test reconnect_after.
> +--
> +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
> +c.state
> +while not c:is_connected() do fiber.sleep(0.01) end
> +c:ping()
> +
> +s:drop()
>  c:close()
> -c = nil
> +c.state
> +
> +box.schema.user.revoke('guest', 'read,write,execute', 'universe')
> -- 
> 2.14.3 (Apple Git-98)
> 
> 

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov




More information about the Tarantool-patches mailing list