[patches] [PATCH vshard 5/5] router: get rid of replicaset.candidate

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Mar 2 00:39:41 MSK 2018


After #76 is closed, a router holds connections to all replicas,
including slaves. Replicaset.candidate was used by failover
algorithm to try to connect to replicas with a higher priority,
than a current replica has. But now all replicas already have
connections, and candidate is not needed anymore.

Follow up #76

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 test/failover/failover.result   |  67 +++++---------------------
 test/failover/failover.test.lua |  29 ++----------
 vshard/replicaset.lua           | 101 +++++++++++-----------------------------
 vshard/router/init.lua          |  46 +++---------------
 4 files changed, 48 insertions(+), 195 deletions(-)

diff --git a/test/failover/failover.result b/test/failover/failover.result
index 1a94f63..73a4250 100644
--- a/test/failover/failover.result
+++ b/test/failover/failover.result
@@ -177,6 +177,9 @@ test_run:switch('router_1')
 rs1 = vshard.router.internal.replicasets[rs_uuid[1]]
 ---
 ...
+while not rs1.replica_up_ts do fiber.sleep(0.1) end
+---
+...
 old_up_ts = rs1.replica_up_ts
 ---
 ...
@@ -252,37 +255,24 @@ test_run:switch('router_1')
 ---
 - true
 ...
--- Once per FAILOVER_UP_TIMEOUT a router tries to reconnect to the
--- best replica.
-while rs1.candidate == nil or rs1.candidate.down_ts == nil do fiber.sleep(0.1) end
----
-...
--- Check that the original replica is used, while a candidate
--- tries to connnect.
-vshard.router.call(1, 'read', 'echo', {123})
----
-- 123
-...
-test_run:switch('box_1_b')
+-- Revive the best replica. A router must reconnect to it in
+-- FAILOVER_UP_TIMEOUT seconds.
+test_run:cmd('start server box_1_d')
 ---
 - true
 ...
-echo_count
+ts1 = fiber.time()
 ---
-- 2
 ...
-test_run:switch('router_1')
+while rs1.replica.name ~= 'box_1_d' do fiber.sleep(0.1) end
 ---
-- true
 ...
--- Revive the best replica. A router must reconnect to it in
--- FAILOVER_UP_TIMEOUT seconds.
-test_run:cmd('start server box_1_d')
+ts2 = fiber.time()
 ---
-- true
 ...
-while rs1.replica.name ~= 'box_1_d' do fiber.sleep(0.1) end
+ts2 - ts1 < vshard.consts.FAILOVER_UP_TIMEOUT
 ---
+- true
 ...
 test_run:grep_log('router_1', 'New replica box_1_d%(storage%@')
 ---
@@ -309,41 +299,6 @@ rs1.replica.conn == rs1.master.conn
 ---
 - true
 ...
---
--- Ensure the candidate downs its priority until mets a current
--- replica.
---
-while not rs1.candidate or rs1.candidate.name ~= 'box_1_d' do fiber.sleep(0.1) end
----
-...
-ts1 = fiber.time()
----
-...
-while rs1.candidate.name ~= 'box_1_b' do fiber.sleep(0.1) end
----
-...
-ts2 = fiber.time()
----
-...
--- Ensure the candidate is updated more often than once per
--- UP_TIMEOUT seconds.
-ts2 - ts1 < vshard.consts.FAILOVER_UP_TIMEOUT
----
-- true
-...
-ts1 = ts2
----
-...
-while rs1.candidate.name ~= 'box_1_c' do fiber.sleep(0.1) end
----
-...
-ts2 = fiber.time()
----
-...
-ts2 - ts1 < vshard.consts.FAILOVER_UP_TIMEOUT
----
-- true
-...
 test_run:cmd('start server box_1_b with wait=False, wait_load=False')
 ---
 - true
diff --git a/test/failover/failover.test.lua b/test/failover/failover.test.lua
index bee1216..6e06314 100644
--- a/test/failover/failover.test.lua
+++ b/test/failover/failover.test.lua
@@ -75,6 +75,7 @@ echo_count
 -- Ensure that replica_up_ts is updated periodically.
 test_run:switch('router_1')
 rs1 = vshard.router.internal.replicasets[rs_uuid[1]]
+while not rs1.replica_up_ts do fiber.sleep(0.1) end
 old_up_ts = rs1.replica_up_ts
 while rs1.replica_up_ts == old_up_ts do fiber.sleep(0.1) end
 rs1.replica_up_ts - old_up_ts >= vshard.consts.FAILOVER_UP_TIMEOUT
@@ -105,20 +106,13 @@ test_run:cmd('switch box_1_b')
 echo_count
 test_run:switch('router_1')
 
--- Once per FAILOVER_UP_TIMEOUT a router tries to reconnect to the
--- best replica.
-while rs1.candidate == nil or rs1.candidate.down_ts == nil do fiber.sleep(0.1) end
--- Check that the original replica is used, while a candidate
--- tries to connnect.
-vshard.router.call(1, 'read', 'echo', {123})
-test_run:switch('box_1_b')
-echo_count
-test_run:switch('router_1')
-
 -- Revive the best replica. A router must reconnect to it in
 -- FAILOVER_UP_TIMEOUT seconds.
 test_run:cmd('start server box_1_d')
+ts1 = fiber.time()
 while rs1.replica.name ~= 'box_1_d' do fiber.sleep(0.1) end
+ts2 = fiber.time()
+ts2 - ts1 < vshard.consts.FAILOVER_UP_TIMEOUT
 test_run:grep_log('router_1', 'New replica box_1_d%(storage%@')
 
 -- Ensure the master connection is used as replica's one instead
@@ -128,21 +122,6 @@ test_run:cmd('stop server box_1_c')
 test_run:cmd('stop server box_1_d')
 while rs1.replica.name ~= 'box_1_a' do fiber.sleep(0.1) end
 rs1.replica.conn == rs1.master.conn
---
--- Ensure the candidate downs its priority until mets a current
--- replica.
---
-while not rs1.candidate or rs1.candidate.name ~= 'box_1_d' do fiber.sleep(0.1) end
-ts1 = fiber.time()
-while rs1.candidate.name ~= 'box_1_b' do fiber.sleep(0.1) end
-ts2 = fiber.time()
--- Ensure the candidate is updated more often than once per
--- UP_TIMEOUT seconds.
-ts2 - ts1 < vshard.consts.FAILOVER_UP_TIMEOUT
-ts1 = ts2
-while rs1.candidate.name ~= 'box_1_c' do fiber.sleep(0.1) end
-ts2 = fiber.time()
-ts2 - ts1 < vshard.consts.FAILOVER_UP_TIMEOUT
 
 test_run:cmd('start server box_1_b with wait=False, wait_load=False')
 test_run:cmd('start server box_1_c with wait=False, wait_load=False')
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 56687be..e176945 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -25,8 +25,6 @@
 --      },
 --      master = <master server from the array above>,
 --      replica = <nearest available replica object>,
---      candidate = <replica with less weight, which tries to
---                   connect and replace an original replica>,
 --      replica_up_ts = <timestamp updated on each attempt to
 --                       connect to the nearest replica, and on
 --                       each connect event>,
@@ -68,8 +66,7 @@ local function netbox_on_connect(conn)
         conn:close()
         return
     end
-    if (replica == rs.replica or replica == rs.candidate) and
-       replica == rs.priority_list[1] then
+    if replica == rs.replica and replica == rs.priority_list[1] then
         -- Update replica_up_ts, if the current replica has the
         -- biggest priority. Really, it is not neccessary to
         -- increase replica connection priority, if the current
@@ -131,60 +128,6 @@ local function replicaset_connect_all(replicaset)
     end
 end
 
---
--- Make a replica be used for read requests or be candidate.
--- @param replicaset Replicaset for which a replica is set.
--- @param replica Replica to be used for read requests.
--- @param read_name Either replica or candidate. Both of them can
---        be updated independently (@sa update_candidate(),
---        down_priority()).
---
-local function replicaset_make_replica_read(replicaset, replica, read_name)
-    assert(read_name == 'replica' or read_name == 'candidate')
-    assert(replicaset[read_name] ~= replica)
-    replicaset_connect_to_replica(replicaset, replica)
-    replicaset[read_name] = replica
-end
-
---
--- Try to connect to another candidate. There is two cases:
--- either
--- * it is time to reconnect to the nearest replica - choose
---   first replica in priority list, or
--- * current candidate can not connect to a server during
---   DOWN_TIMEOUT seconds - then the candidate is set to a next by
---   priority.
---
--- New connection is stored into candidate. It replaces an
--- original replica when connected.
---
-local function replicaset_update_candidate(replicaset)
-    local old_candidate = replicaset.candidate
-    local new_candidate
-    local curr_ts = fiber.time()
-    local up_ts = replicaset.replica_up_ts
-    if not old_candidate or not up_ts or
-       curr_ts - up_ts >= consts.FAILOVER_UP_TIMEOUT then
-        new_candidate = replicaset.priority_list[1]
-        -- Update timestamp of the last attempt to connect to the
-        -- best replica.
-        replicaset.replica_up_ts = curr_ts
-        -- It is possible, that the current replica already has
-        -- the best priority. In such a case there is no need
-        -- to create candidate.
-        if new_candidate == replicaset.replica or
-           new_candidate == old_candidate then
-            return
-        end
-    else
-        assert(old_candidate.next_by_priority and old_candidate.down_ts and
-               curr_ts - old_candidate.down_ts >= consts.FAILOVER_DOWN_TIMEOUT
-               and old_candidate.next_by_priority ~= replicaset.replica)
-        new_candidate = old_candidate.next_by_priority
-    end
-    replicaset_make_replica_read(replicaset, new_candidate, 'candidate')
-end
-
 --
 -- Connect to a next replica with less priority against a current
 -- one. It is needed, if a current replica's connection is down
@@ -194,27 +137,39 @@ local function replicaset_down_replica_priority(replicaset)
     local old_replica = replicaset.replica
     assert(old_replica and old_replica.down_ts and
            not old_replica:is_connected())
-    local new_replica = replicaset.replica.next_by_priority
+    local new_replica = old_replica.next_by_priority
     if new_replica then
-        replicaset_make_replica_read(replicaset, new_replica, 'replica')
+        assert(new_replica ~= old_replica)
+        replicaset_connect_to_replica(replicaset, new_replica)
+        replicaset.replica = new_replica
     end
     -- Else the current replica already has the lowest priority.
     -- Can not down it.
 end
 
 --
--- Set candidate as the current replica. Candidate attribute is
--- nullified and can be reused, for example, to try to connect to
--- the nearest replica.
+-- Search a replica with higher priority than a current replica
+-- has.
 --
-local function replicaset_set_candidate_as_replica(replicaset)
-    assert(replicaset.candidate)
+local function replicaset_up_replica_priority(replicaset)
     local old_replica = replicaset.replica
-    replicaset.replica = replicaset.candidate
-    assert(not old_replica or
-           old_replica.weight >= replicaset.replica.weight and
-           old_replica ~= replicaset.replica)
-    replicaset.candidate = nil
+    if old_replica == replicaset.priority_list[1] and
+       old_replica:is_connected() then
+        replicaset.replica_up_ts = fiber.time()
+        return
+    end
+    for _, replica in pairs(replicaset.priority_list) do
+        if replica == old_replica then
+            -- Failed to up priority.
+            return
+        end
+        if replica:is_connected() then
+            replicaset.replica = replica
+            assert(not old_replica or
+                   old_replica.weight >= replicaset.replica.weight)
+            return
+        end
+    end
 end
 
 --
@@ -414,9 +369,8 @@ local replicaset_mt = {
         connect_all = replicaset_connect_all;
         connect_replica = replicaset_connect_to_replica;
         rebind_connections = replicaset_rebind_connections;
-        update_candidate = replicaset_update_candidate;
         down_replica_priority = replicaset_down_replica_priority;
-        set_candidate_as_replica = replicaset_set_candidate_as_replica;
+        up_replica_priority = replicaset_up_replica_priority;
         call = replicaset_master_call;
         callrw = replicaset_master_call;
         callro = replicaset_nearest_call;
@@ -580,9 +534,6 @@ local function destroy(replicasets)
         if rs.replica and rs.replica.conn then
             rs.replica.conn:close()
         end
-        if rs.candidate and rs.candidate.conn then
-            rs.candidate.conn:close()
-        end
     end
 end
 
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index d4d5aed..a57bd09 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -335,39 +335,12 @@ local function failover_need_down_priority(replicaset, curr_ts)
 end
 
 --
--- Replicaset must try to connect to a server with the highest
--- priority once per specified timeout. It allows to return to
--- the best server, if it was unavailable and has returned back.
--- And if the connection attempt was not successfull, then the
--- candidate must try a replica next by priority.
+-- Once per FAILOVER_UP_TIMEOUT a replicaset must try to connect
+-- to a replica with a higher priority.
 --
-local function failover_need_update_candidate(replicaset, curr_ts)
+local function failover_need_up_priority(replicaset, curr_ts)
     local up_ts = replicaset.replica_up_ts
-    -- First attempt to connect to replica.
-    if not up_ts then
-        return true
-    end
-    -- Try to reconnect to the best replica once per UP_TIMEOUT.
-    if curr_ts - up_ts >= consts.FAILOVER_UP_TIMEOUT then
-        return true
-    end
-    -- Candidate can not connect to a replica. Try next by
-    -- priority, if it is not current replica. Candidate always
-    -- must have weight <= current replica weight.
-    local candidate = replicaset.candidate
-    return candidate and
-           curr_ts - candidate.down_ts >= consts.FAILOVER_DOWN_TIMEOUT and
-           candidate.next_by_priority and
-           candidate.next_by_priority ~= replicaset.replica
-end
-
---
--- Check that a candidate is connected to its replica. In such a
--- case it becames new replica, because its weight <= current one.
---
-local function failover_is_candidate_connected(replicaset)
-    local candidate = replicaset.candidate
-    return candidate and candidate:is_connected()
+    return not up_ts or curr_ts - up_ts >= consts.FAILOVER_UP_TIMEOUT
 end
 
 --
@@ -379,8 +352,7 @@ local function failover_collect_to_update()
     local uuid_to_update = {}
     for uuid, rs in pairs(M.replicasets) do
         if failover_need_down_priority(rs, ts) or
-           failover_is_candidate_connected(rs) or
-           failover_need_update_candidate(rs, ts) then
+           failover_need_up_priority(rs, ts) then
             table.insert(uuid_to_update, uuid)
         end
     end
@@ -413,13 +385,10 @@ local function failover_step()
             return true
         end
         local old_replica = rs.replica
-        if failover_is_candidate_connected(rs) then
-            rs:set_candidate_as_replica()
+        if failover_need_up_priority(rs, curr_ts) then
+            rs:up_replica_priority()
             replica_is_changed = true
         end
-        if failover_need_update_candidate(rs, curr_ts) then
-            rs:update_candidate()
-        end
         if failover_need_down_priority(rs, curr_ts) then
             rs:down_replica_priority()
             replica_is_changed = true
@@ -519,7 +488,6 @@ local function router_cfg(cfg)
     -- connections and yield.
     for _, replicaset in pairs(new_replicasets) do
         replicaset:connect_all()
-        replicaset:update_candidate()
     end
     lreplicaset.wait_masters_connect(new_replicasets)
     if M.failover_fiber == nil then
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list