[patches] [PATCH 1/2] netbox: fix leak of connection with set reconnect_after option
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Feb 22 22:25:59 MSK 2018
If a connection has reconnect_after > 0, then it is never deleted
until it is explicitly closed or reconnect_after is reset. It is
because worker fiber of such connection is never stoped until
close() and holds all references.
Fix it by restarting the worker fiber for reconnection from the
function that holds only weak references. After the fix, there
is a period between two reconnect attempts, when a worker is
finished and all references are weak. And Lua garbage collector
becomes able to delete this connection.
Closes #3164
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
src/box/lua/net_box.lua | 95 +++++++++++++++++++++++++++++++++--------------
test/box/net.box.result | 79 +++++++++++++++++++++++++++++++++++++--
test/box/net.box.test.lua | 32 +++++++++++++++-
3 files changed, 174 insertions(+), 32 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 396dc39eb..041e6d1ce 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -112,10 +112,11 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
-- The following events are delivered, with arguments:
--
-- 'state_changed', state, errno, error
--- 'handshake', greeting -> nil (accept) / errno, error (reject)
--- 'will_fetch_schema' -> true (approve) / false (skip fetch)
+-- 'handshake', greeting -> nil (accept) / errno, error (reject)
+-- 'will_fetch_schema' -> true (approve) / false (skip fetch)
-- 'did_fetch_schema', schema_version, spaces, indices
--- 'will_reconnect', errno, error -> true (approve) / false (reject)
+-- 'reconnect_timeout' -> get reconnect timeout if set and > 0,
+-- else nil is returned.
--
-- Suggestion for callback writers: sleep a few secs before approving
-- reconnect.
@@ -186,25 +187,64 @@ local function create_transport(host, port, user, password, callback)
-- CONNECT/CLOSE --
local protocol_sm
+ local reconnect_f
+
+ --
+ -- Background function to process requests until an error.
+ -- After an error if reconnect_after is set, this function
+ -- restarts self via reconnect_f function.
+ --
+ local function worker_f()
+ worker_fiber = fiber_self()
+ fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true})
+ local ok, err = pcall(protocol_sm)
+ if not (ok or is_final_state[state]) then
+ set_state('error', E_UNKNOWN, err)
+ end
+ if connection then
+ connection:close()
+ connection = nil
+ end
+ send_buf:recycle()
+ recv_buf:recycle()
+ worker_fiber = nil
+ if state == 'error_reconnect' then
+ fiber.create(reconnect_f)
+ end
+ end
+
+ --
+ -- Background function for one reconnect attempt. Connection
+ -- state machine can not reconnect self directly, because else
+ -- it would be infinite. When a state machine is infinite, it
+ -- holds all references forever, and the connection can not be
+ -- garbage collected (see gh-3164). So the state machine on
+ -- failed connection must be finished and restarted after
+ -- reconnect timeout from helper function - reconnect_f.
+ -- Between two connection attempts all references must be
+ -- released to allow garbage collector delete the connection,
+ -- if needed.
+ --
+ reconnect_f = function()
+ local timeout = callback('reconnect_timeout')
+ if not timeout then
+ return
+ end
+ fiber.sleep(timeout)
+ -- During sleeping a connection could be garbage
+ -- collected. In such a case state becomes 'closed'. Also
+ -- 'reconnect_after' option could be reset.
+ timeout = callback('reconnect_timeout')
+ if not timeout or state == 'closed' then
+ return
+ end
+ fiber.create(worker_f)
+ end
local function connect()
if state ~= 'initial' then return not is_final_state[state] end
set_state('connecting')
- fiber.create(function()
- worker_fiber = fiber_self()
- fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true})
- local ok, err = pcall(protocol_sm)
- if not (ok or is_final_state[state]) then
- set_state('error', E_UNKNOWN, err)
- end
- if connection then
- connection:close()
- connection = nil
- end
- send_buf:recycle()
- recv_buf:recycle()
- worker_fiber = nil
- end)
+ fiber.create(worker_f)
return true
end
@@ -496,12 +536,12 @@ local function create_transport(host, port, user, password, callback)
if connection then connection:close(); connection = nil end
send_buf:recycle()
recv_buf:recycle()
- set_state('error_reconnect', err, msg)
- if callback('will_reconnect', err, msg) then
- set_state('connecting')
- return protocol_sm()
- else
- set_state('error', err, msg)
+ if state ~= 'closed' then
+ if callback('reconnect_timeout') then
+ set_state('error_reconnect', err, msg)
+ else
+ set_state('error', err, msg)
+ end
end
end
@@ -629,9 +669,10 @@ local function connect(...)
return opts.connect_timeout or 10
elseif what == 'did_fetch_schema' then
remote:_install_schema(...)
- elseif what == 'will_reconnect' then
- if type(opts.reconnect_after) == 'number' then
- fiber.sleep(opts.reconnect_after); return true
+ elseif what == 'reconnect_timeout' then
+ if type(opts.reconnect_after) == 'number' and
+ opts.reconnect_after > 0 then
+ return opts.reconnect_after
end
end
end
diff --git a/test/box/net.box.result b/test/box/net.box.result
index fcd441856..9e0571c63 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -1962,10 +1962,6 @@ test_run:cmd('stop server connecter')
---
- true
...
-test_run:cmd('cleanup server connecter')
----
-- true
-...
--
-- gh-2401 update pseudo objects not replace them
--
@@ -2134,6 +2130,81 @@ disconnected -- true
box.session.on_disconnect(nil, on_disconnect)
---
...
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+---
+- true
+...
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+---
+- true
+...
+weak = setmetatable({}, {__mode = 'v'})
+---
+...
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.5})
+---
+...
+weak.c = strong
+---
+...
+weak.c:ping()
+---
+- true
+...
+test_run:cmd('stop server connecter')
+---
+- true
+...
+test_run:cmd('cleanup server connecter')
+---
+- true
+...
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+strong.state
+---
+- error_reconnect
+...
+strong == weak.c
+---
+- true
+...
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
+---
+- null
+...
box.schema.user.revoke('guest', 'execute', 'universe')
---
...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 378dfd4ab..4b5f32426 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -789,7 +789,6 @@ disconnected_cnt
conn:close()
disconnected_cnt
test_run:cmd('stop server connecter')
-test_run:cmd('cleanup server connecter')
--
-- gh-2401 update pseudo objects not replace them
@@ -871,4 +870,35 @@ while disconnected == false do fiber.sleep(0.01) end
disconnected -- true
box.session.on_disconnect(nil, on_disconnect)
+
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+weak = setmetatable({}, {__mode = 'v'})
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.5})
+weak.c = strong
+weak.c:ping()
+test_run:cmd('stop server connecter')
+test_run:cmd('cleanup server connecter')
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+collectgarbage('collect')
+strong.state
+strong == weak.c
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+collectgarbage('collect')
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
+
box.schema.user.revoke('guest', 'execute', 'universe')
--
2.14.3 (Apple Git-98)
More information about the Tarantool-patches
mailing list