[patches] [PATCH 1/2] netbox: fix leak of connection with set reconnect_after option

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Feb 22 22:25:59 MSK 2018


If a connection has reconnect_after > 0, then it is never deleted
until it is explicitly closed or reconnect_after is reset. It is
because worker fiber of such connection is never stoped until
close() and holds all references.

Fix it by restarting the worker fiber for reconnection from the
function that holds only weak references. After the fix, there
is a period between two reconnect attempts, when a worker is
finished and all references are weak. And Lua garbage collector
becomes able to delete this connection.

Closes #3164

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 src/box/lua/net_box.lua   | 95 +++++++++++++++++++++++++++++++++--------------
 test/box/net.box.result   | 79 +++++++++++++++++++++++++++++++++++++--
 test/box/net.box.test.lua | 32 +++++++++++++++-
 3 files changed, 174 insertions(+), 32 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 396dc39eb..041e6d1ce 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -112,10 +112,11 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
 -- The following events are delivered, with arguments:
 --
 --  'state_changed', state, errno, error
---  'handshake', greeting           -> nil (accept) / errno, error (reject)
---  'will_fetch_schema'             -> true (approve) / false (skip fetch)
+--  'handshake', greeting -> nil (accept) / errno, error (reject)
+--  'will_fetch_schema'   -> true (approve) / false (skip fetch)
 --  'did_fetch_schema', schema_version, spaces, indices
---  'will_reconnect', errno, error  -> true (approve) / false (reject)
+--  'reconnect_timeout'   -> get reconnect timeout if set and > 0,
+--                           else nil is returned.
 --
 -- Suggestion for callback writers: sleep a few secs before approving
 -- reconnect.
@@ -186,25 +187,64 @@ local function create_transport(host, port, user, password, callback)
 
     -- CONNECT/CLOSE --
     local protocol_sm
+    local reconnect_f
+
+    --
+    -- Background function to process requests until an error.
+    -- After an error if reconnect_after is set, this function
+    -- restarts self via reconnect_f function.
+    --
+    local function worker_f()
+        worker_fiber = fiber_self()
+        fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true})
+        local ok, err = pcall(protocol_sm)
+        if not (ok or is_final_state[state]) then
+            set_state('error', E_UNKNOWN, err)
+        end
+        if connection then
+            connection:close()
+            connection = nil
+        end
+        send_buf:recycle()
+        recv_buf:recycle()
+        worker_fiber = nil
+        if state == 'error_reconnect' then
+            fiber.create(reconnect_f)
+        end
+    end
+
+    --
+    -- Background function for one reconnect attempt. Connection
+    -- state machine can not reconnect self directly, because else
+    -- it would be infinite. When a state machine is infinite, it
+    -- holds all references forever, and the connection can not be
+    -- garbage collected (see gh-3164). So the state machine on
+    -- failed connection must be finished and restarted after
+    -- reconnect timeout from helper function - reconnect_f.
+    -- Between two connection attempts all references must be
+    -- released to allow garbage collector delete the connection,
+    -- if needed.
+    --
+    reconnect_f = function()
+        local timeout = callback('reconnect_timeout')
+        if not timeout then
+            return
+        end
+        fiber.sleep(timeout)
+        -- During sleeping a connection could be garbage
+        -- collected. In such a case state becomes 'closed'. Also
+        -- 'reconnect_after' option could be reset.
+        timeout = callback('reconnect_timeout')
+        if not timeout or state == 'closed' then
+            return
+        end
+        fiber.create(worker_f)
+    end
 
     local function connect()
         if state ~= 'initial' then return not is_final_state[state] end
         set_state('connecting')
-        fiber.create(function()
-            worker_fiber = fiber_self()
-            fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true})
-            local ok, err = pcall(protocol_sm)
-            if not (ok or is_final_state[state]) then
-                set_state('error', E_UNKNOWN, err)
-            end
-            if connection then
-                connection:close()
-                connection = nil
-            end
-            send_buf:recycle()
-            recv_buf:recycle()
-            worker_fiber = nil
-        end)
+        fiber.create(worker_f)
         return true
     end
 
@@ -496,12 +536,12 @@ local function create_transport(host, port, user, password, callback)
         if connection then connection:close(); connection = nil end
         send_buf:recycle()
         recv_buf:recycle()
-        set_state('error_reconnect', err, msg)
-        if callback('will_reconnect', err, msg) then
-            set_state('connecting')
-            return protocol_sm()
-        else
-            set_state('error', err, msg)
+        if state ~= 'closed' then
+            if callback('reconnect_timeout') then
+                set_state('error_reconnect', err, msg)
+            else
+                set_state('error', err, msg)
+            end
         end
     end
 
@@ -629,9 +669,10 @@ local function connect(...)
             return opts.connect_timeout or 10
         elseif what == 'did_fetch_schema' then
             remote:_install_schema(...)
-        elseif what == 'will_reconnect' then
-            if type(opts.reconnect_after) == 'number' then
-                fiber.sleep(opts.reconnect_after); return true
+        elseif what == 'reconnect_timeout' then
+            if type(opts.reconnect_after) == 'number' and
+               opts.reconnect_after > 0 then
+                return opts.reconnect_after
             end
         end
     end
diff --git a/test/box/net.box.result b/test/box/net.box.result
index fcd441856..9e0571c63 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -1962,10 +1962,6 @@ test_run:cmd('stop server connecter')
 ---
 - true
 ...
-test_run:cmd('cleanup server connecter')
----
-- true
-...
 --
 -- gh-2401 update pseudo objects not replace them
 --
@@ -2134,6 +2130,81 @@ disconnected -- true
 box.session.on_disconnect(nil, on_disconnect)
 ---
 ...
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+---
+- true
+...
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+---
+- true
+...
+weak = setmetatable({}, {__mode = 'v'})
+---
+...
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.5})
+---
+...
+weak.c = strong
+---
+...
+weak.c:ping()
+---
+- true
+...
+test_run:cmd('stop server connecter')
+---
+- true
+...
+test_run:cmd('cleanup server connecter')
+---
+- true
+...
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+strong.state
+---
+- error_reconnect
+...
+strong == weak.c
+---
+- true
+...
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
+---
+- null
+...
 box.schema.user.revoke('guest', 'execute', 'universe')
 ---
 ...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 378dfd4ab..4b5f32426 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -789,7 +789,6 @@ disconnected_cnt
 conn:close()
 disconnected_cnt
 test_run:cmd('stop server connecter')
-test_run:cmd('cleanup server connecter')
 
 --
 -- gh-2401 update pseudo objects not replace them
@@ -871,4 +870,35 @@ while disconnected == false do fiber.sleep(0.01) end
 disconnected -- true
 
 box.session.on_disconnect(nil, on_disconnect)
+
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+weak = setmetatable({}, {__mode = 'v'})
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.5})
+weak.c = strong
+weak.c:ping()
+test_run:cmd('stop server connecter')
+test_run:cmd('cleanup server connecter')
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+collectgarbage('collect')
+strong.state
+strong == weak.c
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+collectgarbage('collect')
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
+
 box.schema.user.revoke('guest', 'execute', 'universe')
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list