[tarantool-patches] [PATCH v2 1/3] netbox: allow to create a netbox connection from existing socket

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Mar 28 16:35:48 MSK 2018


It is needed to create a binary console connection, when a
socket is already created and a greeting is read and decoded.
---
 src/box/lua/net_box.lua   | 199 ++++++++++++++++++++++++++++++++--------------
 test/box/net.box.result   |  91 ++++++++++++++++++++-
 test/box/net.box.test.lua |  41 +++++++++-
 3 files changed, 264 insertions(+), 67 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 87c8c548b..5691dcb9e 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -31,6 +31,7 @@ local sequence_mt      = { __serialize = 'sequence' }
 local TIMEOUT_INFINITY = 500 * 365 * 86400
 local VSPACE_ID        = 281
 local VINDEX_ID        = 289
+local DEFAULT_CONNECT_TIMEOUT = 10
 
 local IPROTO_STATUS_KEY    = 0x00
 local IPROTO_ERRNO_MASK    = 0x7FFF
@@ -70,9 +71,37 @@ local method_codec           = {
 
 local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
 
--- function create_transport(host, port, user, password, callback)
 --
--- Transport methods: connect(), close(), perfrom_request(), wait_state()
+-- Connect to a remote server, do handshake.
+-- @param host Hostname.
+-- @param port TCP port.
+-- @param timeout Timeout to connect and receive greeting.
+--
+-- @retval nil, err Error occured. The reason is returned.
+-- @retval two non-nils A connected socket and a decoded greeting.
+--
+local function establish_connection(host, port, timeout)
+    local timeout = timeout or DEFAULT_CONNECT_TIMEOUT
+    local begin = fiber.clock()
+    local s = socket.tcp_connect(host, port, timeout)
+    if not s then
+        return nil, errno.strerror(errno())
+    end
+    local msg = s:read({chunk = IPROTO_GREETING_SIZE},
+                        timeout - (fiber.clock() - begin))
+    if not msg then
+        local err = s:error()
+        s:close()
+        return nil, err
+    end
+    local greeting, err = decode_greeting(msg)
+    if not greeting then
+        s:close()
+        return nil, err
+    end
+    return s, greeting
+end
+
 --
 -- Basically, *transport* is a TCP connection speaking one of
 -- Tarantool network protocols. This is a low-level interface.
@@ -85,16 +114,17 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
 --
 -- Transport state machine:
 --
--- State machine starts in 'initial' state. Connect method changes
--- the state to 'connecting' and spawns a worker fiber. Close
--- method sets the state to 'closed' and kills the worker.
--- If the transport is already in 'error' state close() does nothing.
+-- State machine starts in 'initial' state. New_sm method
+-- accepts an established connection and spawns a worker fiber.
+-- Stop method sets the state to 'closed' and kills the worker.
+-- If the transport is already in 'error' state stop() does
+-- nothing.
 --
 -- State chart:
 --
---  initial -> connecting -> active
+-- connecting -> initial +-> active
 --                        \
---                          -> auth -> fetch_schema <-> active
+--                         +-> auth -> fetch_schema <-> active
 --
 --  (any state, on error) -> error_reconnect -> connecting -> ...
 --                                           \
@@ -121,15 +151,15 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
 -- Suggestion for callback writers: sleep a few secs before approving
 -- reconnect.
 --
-local function create_transport(host, port, user, password, callback)
+local function create_transport(host, port, user, password, callback,
+                                connection, greeting)
     -- check / normalize credentials
     if user == nil and password ~= nil then
         box.error(E_PROC_LUA, 'net.box: user is not defined')
     end
     if user ~= nil and password == nil then password = '' end
 
-    -- state: current state, only the worker fiber and connect method
-    -- change state
+    -- Current state machine's state.
     local state            = 'initial'
     local last_errno
     local last_error
@@ -145,7 +175,6 @@ local function create_transport(host, port, user, password, callback)
     local next_request_id  = 1
 
     local worker_fiber
-    local connection
     local send_buf         = buffer.ibuf(buffer.READAHEAD)
     local recv_buf         = buffer.ibuf(buffer.READAHEAD)
 
@@ -185,16 +214,19 @@ local function create_transport(host, port, user, password, callback)
         return state == target_state or target_state[state] or false
     end
 
-    -- CONNECT/CLOSE --
+    -- START/STOP --
     local protocol_sm
 
-    local function connect()
+    local function start()
         if state ~= 'initial' then return not is_final_state[state] end
-        set_state('connecting')
+        if not connection then
+            set_state('error', E_NO_CONNECTION)
+            return
+        end
         fiber.create(function()
             worker_fiber = fiber_self()
             fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true})
-    ::reconnect::
+    ::handle_connection::
             local ok, err = pcall(protocol_sm)
             if not (ok or is_final_state[state]) then
                 set_state('error', E_UNKNOWN, err)
@@ -204,21 +236,34 @@ local function create_transport(host, port, user, password, callback)
                 connection = nil
             end
             local timeout = callback('reconnect_timeout')
-            if timeout and state == 'error_reconnect' then
+            -- Non-nil reconnect_timeout is an indicator of set
+            -- reconnect_after option.
+            while timeout and state == 'error_reconnect' do
                 fiber.sleep(timeout)
+                -- During sleep, a connection could be garbage
+                -- collected, or explicitly closed by a user, or
+                -- the reconnect_after could be reset. Check the
+                -- timeout again.
                 timeout = callback('reconnect_timeout')
-                if timeout and state == 'error_reconnect' then
-                    goto reconnect
+                if not timeout or state ~= 'error_reconnect' then
+                    break
+                end
+                connection, greeting =
+                    establish_connection(host, port,
+                                         callback('fetch_connect_timeout'))
+                if connection then
+                    goto handle_connection
                 end
+                set_state('error_reconnect', E_NO_CONNECTION, greeting)
+                timeout = callback('reconnect_timeout')
             end
             send_buf:recycle()
             recv_buf:recycle()
             worker_fiber = nil
         end)
-        return true
     end
 
-    local function close()
+    local function stop()
         if not is_final_state[state] then
             set_state('closed', E_NO_CONNECTION, 'Connection closed')
         end
@@ -367,27 +412,14 @@ local function create_transport(host, port, user, password, callback)
     -- collected. See gh-3164, where because of reconnect sleeps
     -- in this function, a connection could not be deleted.
     --
-    protocol_sm = function ()
-        local tm_begin, tm = fiber.clock(), callback('fetch_connect_timeout')
-        connection = socket.tcp_connect(host, port, tm)
-        if connection == nil then
-            return error_sm(E_NO_CONNECTION, errno.strerror(errno()))
-        end
-        local size = IPROTO_GREETING_SIZE
-        local err, msg = send_and_recv(size, tm - (fiber.clock() - tm_begin))
-        if err then
-            return error_sm(err, msg)
-        end
-        local g = decode_greeting(ffi.string(recv_buf.rpos, size))
-        recv_buf.rpos = recv_buf.rpos + size
-        if not g then
-            return error_sm(E_NO_CONNECTION, 'Can\'t decode handshake')
-        end
-        err, msg = callback('handshake', g)
+    protocol_sm = function()
+        assert(connection)
+        assert(greeting)
+        local err, msg = callback('handshake', greeting)
         if err then
             return error_sm(err, msg)
         end
-        if g.protocol == 'Lua console' then
+        if greeting.protocol == 'Lua console' then
             local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
             method_codec.inject(send_buf, nil, nil, setup_delimiter)
             local err, response = send_and_recv_console()
@@ -399,10 +431,11 @@ local function create_transport(host, port, user, password, callback)
             local rid = next_request_id
             set_state('active')
             return console_sm(rid)
-        elseif g.protocol == 'Binary' then
-            return iproto_auth_sm(g.salt)
+        elseif greeting.protocol == 'Binary' then
+            return iproto_auth_sm(greeting.salt)
         else
-            return error_sm(E_NO_CONNECTION, 'Unknown protocol: ' .. g.protocol)
+            return error_sm(E_NO_CONNECTION,
+                            'Unknown protocol: '..greeting.protocol)
         end
     end
 
@@ -524,14 +557,14 @@ local function create_transport(host, port, user, password, callback)
     end
 
     return {
-        close           = close,
-        connect         = connect,
+        stop            = stop,
+        start           = start,
         wait_state      = wait_state,
         perform_request = perform_request
     }
 end
 
--- Wrap create_transport, adding auto-close-on-GC feature.
+-- Wrap create_transport, adding auto-stop-on-GC feature.
 -- All the GC magic is neatly encapsulated!
 -- The tricky part is the callback:
 --  * callback (typically) references the transport (indirectly);
@@ -539,23 +572,24 @@ end
 --  * fibers are GC roots - i.e. transport is never GC-ed!
 -- We solve the issue by making the worker->callback ref weak.
 -- Now it is necessary to have a strong ref to callback somewhere or
--- it is GC-ed prematurely. We wrap close() method, stashing the
--- ref in an upvalue (close() performance doesn't matter much.)
-local create_transport = function(host, port, user, password, callback)
+-- it is GC-ed prematurely. We wrap stop() method, stashing the
+-- ref in an upvalue (stop() performance doesn't matter much.)
+local create_transport = function(host, port, user, password, callback,
+                                  connection, greeting)
     local weak_refs = setmetatable({callback = callback}, {__mode = 'v'})
     local function weak_callback(...)
         local callback = weak_refs.callback
         if callback then return callback(...) end
     end
-    local transport = create_transport(host, port, user,
-                                       password, weak_callback)
-    local transport_close = transport.close
+    local transport = create_transport(host, port, user, password,
+                                       weak_callback, connection, greeting)
+    local transport_stop = transport.stop
     local gc_hook = ffi.gc(ffi.new('char[1]'), function()
-        pcall(transport_close)
+        pcall(transport_stop)
     end)
-    transport.close = function()
+    transport.stop = function()
         -- dummy gc_hook, callback refs prevent premature GC
-        return transport_close(gc_hook, callback)
+        return transport_stop(gc_hook, callback)
     end
     return transport
 end
@@ -612,8 +646,7 @@ local console_mt = {
 
 local space_metatable, index_metatable
 
-local function connect(...)
-    local host, port, opts = parse_connect_params(...)
+local function new_sm(host, port, opts, connection, greeting)
     local user, password = opts.user, opts.password; opts.password = nil
     local last_reconnect_error
     local remote = {host = host, port = port, opts = opts, state = 'initial'}
@@ -652,7 +685,7 @@ local function connect(...)
         elseif what == 'will_fetch_schema' then
             return not opts.console
         elseif what == 'fetch_connect_timeout' then
-            return opts.connect_timeout or 10
+            return opts.connect_timeout or DEFAULT_CONNECT_TIMEOUT
         elseif what == 'did_fetch_schema' then
             remote:_install_schema(...)
         elseif what == 'reconnect_timeout' then
@@ -679,14 +712,56 @@ local function connect(...)
     remote._on_schema_reload = trigger.new("on_schema_reload")
     remote._on_disconnect = trigger.new("on_disconnect")
     remote._on_connect = trigger.new("on_connect")
-    remote._transport = create_transport(host, port, user, password, callback)
-    remote._transport.connect()
+    remote._transport = create_transport(host, port, user, password, callback,
+                                         connection, greeting)
+    remote._transport.start()
     if opts.wait_connected ~= false then
         remote._transport.wait_state('active', tonumber(opts.wait_connected))
     end
     return remote
 end
 
+--
+-- Wrap an existing connection into net.box API.
+-- @param connection Connected socket.
+-- @param greeting Decoded greeting, received from a server.
+-- @param host Hostname to which @a connection is established.
+-- @param port TCP port to which @a connection is established.
+-- @param opts Options like reconnect_after, connect_timeout,
+--        wait_connected, login, password, ...
+--
+-- @retval Net.box object.
+--
+local function wrap(connection, greeting, host, port, opts)
+    if connection == nil or type(greeting) ~= 'table' then
+        error('Usage: netbox.wrap(socket, greeting, [opts])')
+    end
+    opts = opts or {}
+    return new_sm(host, port, opts, connection, greeting)
+end
+
+--
+-- Connect to a remote server.
+-- @param uri OR host and port. URI is a string like
+--        hostname:port at login:password. Host and port can be
+--        passed separately with login and password in the next
+--        parameter.
+-- @param opts @Sa wrap().
+--
+-- @retval Net.box object.
+--
+local function connect(...)
+    local host, port, opts = parse_connect_params(...)
+    local connection, greeting =
+        establish_connection(host, port, opts.connect_timeout)
+    if not connection then
+        local dummy_conn = new_sm(host, port, opts)
+        dummy_conn.error = greeting
+        return dummy_conn
+    end
+    return new_sm(host, port, opts, connection, greeting)
+end
+
 local function check_remote_arg(remote, method)
     if type(remote) ~= 'table' then
         local fmt = 'Use remote:%s(...) instead of remote.%s(...):'
@@ -710,7 +785,7 @@ end
 
 function remote_methods:close()
     check_remote_arg(self, 'close')
-    self._transport.close()
+    self._transport.stop()
 end
 
 function remote_methods:on_schema_reload(...)
@@ -1112,7 +1187,9 @@ end
 local this_module = {
     create_transport = create_transport,
     connect = connect,
-    new = connect -- Tarantool < 1.7.1 compatibility
+    new = connect, -- Tarantool < 1.7.1 compatibility,
+    wrap = wrap,
+    establish_connection = establish_connection,
 }
 
 function this_module.timeout(timeout, ...)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 46d85b327..c541cad60 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -1754,7 +1754,7 @@ nb = net.new('localhost:3392', {
 });
 ---
 ...
-nb.error == "Timeout exceeded" or nb.error == "Connection timed out";
+assert(nb.error == 'Operation timed out');
 ---
 - true
 ...
@@ -2292,12 +2292,97 @@ weak.c
 ---
 - null
 ...
-box.schema.user.revoke('guest', 'execute', 'universe')
+--
+-- gh-2677: netbox supports console connections, that complicates
+-- both console and netbox. It was necessary because before a
+-- connection is established, a console does not known is it
+-- binary or text protocol, and netbox could not be created from
+-- existing socket.
+--
+box.schema.user.grant('guest','read,write,execute','universe')
+---
+...
+urilib = require('uri')
+---
+...
+uri = urilib.parse(tostring(box.cfg.listen))
+---
+...
+s, greeting = net.establish_connection(uri.host, uri.service)
+---
+...
+c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01})
+---
+...
+c.state
+---
+- active
+...
+a = 100
+---
+...
+function kek(args) return {1, 2, 3, args} end
+---
+...
+c:eval('a = 200')
+---
+...
+a
+---
+- 200
+...
+c:call('kek', {300})
+---
+- [1, 2, 3, 300]
+...
+s = box.schema.create_space('test')
+---
+...
+pk = s:create_index('pk')
+---
+...
+c:reload_schema()
+---
+...
+c.space.test:replace{1}
+---
+- [1]
+...
+c.space.test:get{1}
+---
+- [1]
+...
+c.space.test:delete{1}
+---
+- [1]
+...
+--
+-- Break a connection to test reconnect_after.
+--
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+---
+...
+c.state
+---
+- error_reconnect
+...
+while not c:is_connected() do fiber.sleep(0.01) end
+---
+...
+c:ping()
+---
+- true
+...
+s:drop()
 ---
 ...
 c:close()
 ---
 ...
-c = nil
+c.state
+---
+- closed
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 ---
 ...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 87e26f84c..d828ac529 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -701,7 +701,7 @@ nb = net.new('localhost:3392', {
     wait_connected = true, console = true,
     connect_timeout = 0.01
 });
-nb.error == "Timeout exceeded" or nb.error == "Connection timed out";
+assert(nb.error == 'Operation timed out');
 nb:close();
 -- we must get peer closed
 nb = net.new('localhost:3392', {
@@ -937,6 +937,41 @@ collectgarbage('collect')
 -- connection is deleted by 'collect'.
 weak.c
 
-box.schema.user.revoke('guest', 'execute', 'universe')
+--
+-- gh-2677: netbox supports console connections, that complicates
+-- both console and netbox. It was necessary because before a
+-- connection is established, a console does not known is it
+-- binary or text protocol, and netbox could not be created from
+-- existing socket.
+--
+box.schema.user.grant('guest','read,write,execute','universe')
+urilib = require('uri')
+uri = urilib.parse(tostring(box.cfg.listen))
+s, greeting = net.establish_connection(uri.host, uri.service)
+c = net.wrap(s, greeting, uri.host, uri.service, {reconnect_after = 0.01})
+c.state
+
+a = 100
+function kek(args) return {1, 2, 3, args} end
+c:eval('a = 200')
+a
+c:call('kek', {300})
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+c:reload_schema()
+c.space.test:replace{1}
+c.space.test:get{1}
+c.space.test:delete{1}
+--
+-- Break a connection to test reconnect_after.
+--
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+c.state
+while not c:is_connected() do fiber.sleep(0.01) end
+c:ping()
+
+s:drop()
 c:close()
-c = nil
+c.state
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-- 
2.14.3 (Apple Git-98)





More information about the Tarantool-patches mailing list