[patches] [PATCH 1/1] netbox: fix leak of connection with set reconnect_after option

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Feb 21 22:18:10 MSK 2018


If a connection has reconnect_after > 0, then it is never deleted
until it is explicitly closed or reconnect_after is reset. It is
because worker fiber of such connection is never stoped until
close() and holds all references.

Fix it by restarting the worker fiber for reconnection from the
function that holds only weak references. After the fix, there
is a period between two reconnect attempts, when a worker is
finished and all references are weak. And Lua garbage collector
becomes able to delete this connection.

Closes #3164

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 src/box/lua/net_box.lua   | 71 +++++++++++++++++++++++++++++++++---------
 test/box/net.box.result   | 79 ++++++++++++++++++++++++++++++++++++++++++++---
 test/box/net.box.test.lua | 32 ++++++++++++++++++-
 3 files changed, 163 insertions(+), 19 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 396dc39eb..e8e8546a7 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -112,10 +112,11 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
 -- The following events are delivered, with arguments:
 --
 --  'state_changed', state, errno, error
---  'handshake', greeting           -> nil (accept) / errno, error (reject)
---  'will_fetch_schema'             -> true (approve) / false (skip fetch)
+--  'handshake', greeting -> nil (accept) / errno, error (reject)
+--  'will_fetch_schema'   -> true (approve) / false (skip fetch)
 --  'did_fetch_schema', schema_version, spaces, indices
---  'will_reconnect', errno, error  -> true (approve) / false (reject)
+--  'reconnect_timeout'   -> get reconnect timeout if set and > 0,
+--                           else nil is returned.
 --
 -- Suggestion for callback writers: sleep a few secs before approving
 -- reconnect.
@@ -187,10 +188,47 @@ local function create_transport(host, port, user, password, callback)
     -- CONNECT/CLOSE --
     local protocol_sm
 
+    --
+    -- Background function for one reconnect attempt. Connection
+    -- state machine can not reconnect self, because else it would
+    -- be infinite state machine. When a state machine is
+    -- infinite, it holds all references forever, and the
+    -- connection can not be garbage collected (see gh-3164). So
+    -- the state machine on failed connection must be finished and
+    -- restarted after reconnect timeout. And during this interval
+    -- between two connection attempts all references must be
+    -- released to allow garbage collector delete the connection,
+    -- if needed. To achive this temporary references freeing Lua
+    -- has weak references.
+    -- @param worker_f Connection state machine processor.
+    --
+    local function reconnect_f(worker_f)
+        -- All connection internals are referenced by worker_f,
+        -- so it is enough to create a weak reference to only it.
+        local weak_worker =
+            setmetatable({f = worker_f}, {__mode = 'v'})
+        -- Release strong reference.
+        worker_f = nil
+        local timeout = callback('reconnect_timeout')
+        if not timeout then
+            return
+        end
+        fiber.sleep(timeout)
+        -- During sleeping a connection could be garbage collected
+        -- and a weak reference becomes null, or reconnect_after
+        -- option could be reset, or the connection could be
+        -- closed.
+        timeout = callback('reconnect_timeout')
+        if not timeout or not weak_worker.f or state == 'closed' then
+            return
+        end
+        fiber.create(weak_worker.f)
+    end
+
     local function connect()
         if state ~= 'initial' then return not is_final_state[state] end
         set_state('connecting')
-        fiber.create(function()
+        local function worker_f()
             worker_fiber = fiber_self()
             fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true})
             local ok, err = pcall(protocol_sm)
@@ -204,7 +242,11 @@ local function create_transport(host, port, user, password, callback)
             send_buf:recycle()
             recv_buf:recycle()
             worker_fiber = nil
-        end)
+            if state == 'error_reconnect' then
+                fiber.create(reconnect_f, worker_f)
+            end
+        end
+        fiber.create(worker_f)
         return true
     end
 
@@ -496,12 +538,12 @@ local function create_transport(host, port, user, password, callback)
         if connection then connection:close(); connection = nil end
         send_buf:recycle()
         recv_buf:recycle()
-        set_state('error_reconnect', err, msg)
-        if callback('will_reconnect', err, msg) then
-            set_state('connecting')
-            return protocol_sm()
-        else
-            set_state('error', err, msg)
+        if state ~= 'closed' then
+            if callback('reconnect_timeout') then
+                set_state('error_reconnect', err, msg)
+            else
+                set_state('error', err, msg)
+            end
         end
     end
 
@@ -629,9 +671,10 @@ local function connect(...)
             return opts.connect_timeout or 10
         elseif what == 'did_fetch_schema' then
             remote:_install_schema(...)
-        elseif what == 'will_reconnect' then
-            if type(opts.reconnect_after) == 'number' then
-                fiber.sleep(opts.reconnect_after); return true
+        elseif what == 'reconnect_timeout' then
+            if type(opts.reconnect_after) == 'number' and
+               opts.reconnect_after > 0 then
+                return opts.reconnect_after
             end
         end
     end
diff --git a/test/box/net.box.result b/test/box/net.box.result
index fcd441856..9e0571c63 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -1962,10 +1962,6 @@ test_run:cmd('stop server connecter')
 ---
 - true
 ...
-test_run:cmd('cleanup server connecter')
----
-- true
-...
 --
 -- gh-2401 update pseudo objects not replace them
 --
@@ -2134,6 +2130,81 @@ disconnected -- true
 box.session.on_disconnect(nil, on_disconnect)
 ---
 ...
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+---
+- true
+...
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+---
+- true
+...
+weak = setmetatable({}, {__mode = 'v'})
+---
+...
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.5})
+---
+...
+weak.c = strong
+---
+...
+weak.c:ping()
+---
+- true
+...
+test_run:cmd('stop server connecter')
+---
+- true
+...
+test_run:cmd('cleanup server connecter')
+---
+- true
+...
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+strong.state
+---
+- error_reconnect
+...
+strong == weak.c
+---
+- true
+...
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
+---
+- null
+...
 box.schema.user.revoke('guest', 'execute', 'universe')
 ---
 ...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 378dfd4ab..4b5f32426 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -789,7 +789,6 @@ disconnected_cnt
 conn:close()
 disconnected_cnt
 test_run:cmd('stop server connecter')
-test_run:cmd('cleanup server connecter')
 
 --
 -- gh-2401 update pseudo objects not replace them
@@ -871,4 +870,35 @@ while disconnected == false do fiber.sleep(0.01) end
 disconnected -- true
 
 box.session.on_disconnect(nil, on_disconnect)
+
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+weak = setmetatable({}, {__mode = 'v'})
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.5})
+weak.c = strong
+weak.c:ping()
+test_run:cmd('stop server connecter')
+test_run:cmd('cleanup server connecter')
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+collectgarbage('collect')
+strong.state
+strong == weak.c
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+collectgarbage('collect')
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
+
 box.schema.user.revoke('guest', 'execute', 'universe')
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list