[patches] [PATCH 1/2] netbox: fix leak of connection with set reconnect_after option

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Feb 22 23:42:33 MSK 2018


If a connection has reconnect_after > 0, then it is never deleted
until it is explicitly closed or reconnect_after is reset. It is
because worker fiber of such connection holds all references
during yield.

Fix it by do not waiting for a next reconnection inside a state
machine - protocol_sm() function must not be infinite in a case
of error.

Closes #3164

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 src/box/lua/net_box.lua   | 35 ++++++++++++++-------
 test/box/net.box.result   | 79 ++++++++++++++++++++++++++++++++++++++++++++---
 test/box/net.box.test.lua | 32 ++++++++++++++++++-
 3 files changed, 129 insertions(+), 17 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 396dc39eb..110a4942a 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -112,10 +112,11 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
 -- The following events are delivered, with arguments:
 --
 --  'state_changed', state, errno, error
---  'handshake', greeting           -> nil (accept) / errno, error (reject)
---  'will_fetch_schema'             -> true (approve) / false (skip fetch)
+--  'handshake', greeting -> nil (accept) / errno, error (reject)
+--  'will_fetch_schema'   -> true (approve) / false (skip fetch)
 --  'did_fetch_schema', schema_version, spaces, indices
---  'will_reconnect', errno, error  -> true (approve) / false (reject)
+--  'reconnect_timeout'   -> get reconnect timeout if set and > 0,
+--                           else nil is returned.
 --
 -- Suggestion for callback writers: sleep a few secs before approving
 -- reconnect.
@@ -193,6 +194,7 @@ local function create_transport(host, port, user, password, callback)
         fiber.create(function()
             worker_fiber = fiber_self()
             fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true})
+    ::reconnect::
             local ok, err = pcall(protocol_sm)
             if not (ok or is_final_state[state]) then
                 set_state('error', E_UNKNOWN, err)
@@ -201,6 +203,14 @@ local function create_transport(host, port, user, password, callback)
                 connection:close()
                 connection = nil
             end
+            local timeout = callback('reconnect_timeout')
+            if timeout and state == 'error_reconnect' then
+                fiber.sleep(timeout)
+                timeout = callback('reconnect_timeout')
+                if timeout and state == 'error_reconnect' then
+                    goto reconnect
+                end
+            end
             send_buf:recycle()
             recv_buf:recycle()
             worker_fiber = nil
@@ -496,12 +506,12 @@ local function create_transport(host, port, user, password, callback)
         if connection then connection:close(); connection = nil end
         send_buf:recycle()
         recv_buf:recycle()
-        set_state('error_reconnect', err, msg)
-        if callback('will_reconnect', err, msg) then
-            set_state('connecting')
-            return protocol_sm()
-        else
-            set_state('error', err, msg)
+        if state ~= 'closed' then
+            if callback('reconnect_timeout') then
+                set_state('error_reconnect', err, msg)
+            else
+                set_state('error', err, msg)
+            end
         end
     end
 
@@ -629,9 +639,10 @@ local function connect(...)
             return opts.connect_timeout or 10
         elseif what == 'did_fetch_schema' then
             remote:_install_schema(...)
-        elseif what == 'will_reconnect' then
-            if type(opts.reconnect_after) == 'number' then
-                fiber.sleep(opts.reconnect_after); return true
+        elseif what == 'reconnect_timeout' then
+            if type(opts.reconnect_after) == 'number' and
+               opts.reconnect_after > 0 then
+                return opts.reconnect_after
             end
         end
     end
diff --git a/test/box/net.box.result b/test/box/net.box.result
index fcd441856..9e0571c63 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -1962,10 +1962,6 @@ test_run:cmd('stop server connecter')
 ---
 - true
 ...
-test_run:cmd('cleanup server connecter')
----
-- true
-...
 --
 -- gh-2401 update pseudo objects not replace them
 --
@@ -2134,6 +2130,81 @@ disconnected -- true
 box.session.on_disconnect(nil, on_disconnect)
 ---
 ...
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+---
+- true
+...
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+---
+- true
+...
+weak = setmetatable({}, {__mode = 'v'})
+---
+...
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.5})
+---
+...
+weak.c = strong
+---
+...
+weak.c:ping()
+---
+- true
+...
+test_run:cmd('stop server connecter')
+---
+- true
+...
+test_run:cmd('cleanup server connecter')
+---
+- true
+...
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+strong.state
+---
+- error_reconnect
+...
+strong == weak.c
+---
+- true
+...
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
+---
+- null
+...
 box.schema.user.revoke('guest', 'execute', 'universe')
 ---
 ...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 378dfd4ab..4b5f32426 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -789,7 +789,6 @@ disconnected_cnt
 conn:close()
 disconnected_cnt
 test_run:cmd('stop server connecter')
-test_run:cmd('cleanup server connecter')
 
 --
 -- gh-2401 update pseudo objects not replace them
@@ -871,4 +870,35 @@ while disconnected == false do fiber.sleep(0.01) end
 disconnected -- true
 
 box.session.on_disconnect(nil, on_disconnect)
+
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+weak = setmetatable({}, {__mode = 'v'})
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.5})
+weak.c = strong
+weak.c:ping()
+test_run:cmd('stop server connecter')
+test_run:cmd('cleanup server connecter')
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+collectgarbage('collect')
+strong.state
+strong == weak.c
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+collectgarbage('collect')
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
+
 box.schema.user.revoke('guest', 'execute', 'universe')
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list