[tarantool-patches] [PATCH 2/2] Fix discovery/reconfigure race

AKhatskevich avkhatskevich at tarantool.org
Fri Jun 15 15:47:59 MSK 2018


This commit prevents discovery fiber from discovering old replicasets
and spoiling `route_map`.
---
 test/router/router.result   | 62 +++++++++++++++++++++++++++++++++++++++++++++
 test/router/router.test.lua | 42 ++++++++++++++++++++++++++++++
 vshard/router/init.lua      | 15 ++++++++++-
 3 files changed, 118 insertions(+), 1 deletion(-)

diff --git a/test/router/router.result b/test/router/router.result
index 5643f3e..e61505e 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1095,6 +1095,68 @@ for bucket, old_rs in pairs(bucket_to_old_rs) do
 end;
 ---
 ...
+--
+-- Check route_map is not filled with old replica objects after
+-- recpnfigure.
+--
+-- Perform #replicasets phases of discovery, to update replicasets
+-- object in for loop of discovery fiber since previous cfg.
+for _, __ in pairs(vshard.router.internal.replicasets) do
+    vshard.router.discovery_wakeup()
+    fiber.sleep(0.02)
+end;
+---
+...
+-- Simulate long `callro`.
+-- Stuck on first rs in replicasets.
+vshard.router.internal.errinj.LONG_DISCOVERY = true;
+---
+...
+for _, __ in pairs(vshard.router.internal.replicasets) do
+    vshard.router.discovery_wakeup()
+    fiber.sleep(0.02)
+end;
+---
+...
+vshard.router.cfg(cfg);
+---
+...
+vshard.router.internal.errinj.LONG_DISCOVERY = nil;
+---
+...
+-- Do discovery iteration.
+vshard.router.discovery_wakeup()
+fiber.sleep(0.02)
+
+rs_cnt = 0;
+---
+...
+new_replicasets = {}
+for _, rs in pairs(vshard.router.internal.replicasets) do
+    new_replicasets[rs] = true
+    rs_cnt = rs_cnt + 1
+end;
+---
+...
+rs_cnt;
+---
+- 2
+...
+bucket_cnt = 0;
+---
+...
+for bucket_id, rs in pairs(vshard.router.internal.route_map) do
+    if not new_replicasets[rs] then
+        error('Old object added to route_map.')
+    end
+    bucket_cnt = bucket_cnt + 1
+end;
+---
+...
+bucket_cnt;
+---
+- 3000
+...
 test_run:cmd("setopt delimiter ''");
 ---
 - true
diff --git a/test/router/router.test.lua b/test/router/router.test.lua
index 106f3d8..528a84b 100644
--- a/test/router/router.test.lua
+++ b/test/router/router.test.lua
@@ -411,6 +411,48 @@ for bucket, old_rs in pairs(bucket_to_old_rs) do
         error("route_map was not updataed.")
     end
 end;
+
+--
+-- Check route_map is not filled with old replica objects after
+-- recpnfigure.
+--
+
+-- Perform #replicasets phases of discovery, to update replicasets
+-- object in for loop of discovery fiber since previous cfg.
+for _, __ in pairs(vshard.router.internal.replicasets) do
+    vshard.router.discovery_wakeup()
+    fiber.sleep(0.02)
+end;
+-- Simulate long `callro`.
+-- Stuck on first rs in replicasets.
+vshard.router.internal.errinj.LONG_DISCOVERY = true;
+for _, __ in pairs(vshard.router.internal.replicasets) do
+    vshard.router.discovery_wakeup()
+    fiber.sleep(0.02)
+end;
+
+vshard.router.cfg(cfg);
+vshard.router.internal.errinj.LONG_DISCOVERY = nil;
+-- Do discovery iteration.
+vshard.router.discovery_wakeup()
+fiber.sleep(0.02)
+
+rs_cnt = 0;
+new_replicasets = {}
+for _, rs in pairs(vshard.router.internal.replicasets) do
+    new_replicasets[rs] = true
+    rs_cnt = rs_cnt + 1
+end;
+rs_cnt;
+bucket_cnt = 0;
+for bucket_id, rs in pairs(vshard.router.internal.route_map) do
+    if not new_replicasets[rs] then
+        error('Old object added to route_map.')
+    end
+    bucket_cnt = bucket_cnt + 1
+end;
+bucket_cnt;
+
 test_run:cmd("setopt delimiter ''");
 
 _ = test_run:cmd("switch default")
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 7e765fa..df5b343 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -127,10 +127,23 @@ local function discovery_f(module_version)
     local iterations_until_lua_gc =
         consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
     while module_version == M.module_version do
-        for _, replicaset in pairs(M.replicasets) do
+        local old_replicasets = M.replicasets
+        for rs_uuid, replicaset in pairs(M.replicasets) do
             local active_buckets, err =
                 replicaset:callro('vshard.storage.buckets_discovery', {},
                                   {timeout = 2})
+            while M.errinj.LONG_DISCOVERY do
+                -- Stuck on the first replicaset.
+                if rs_uuid ~= select(1, next(M.replicasets)) then
+                    break
+                end
+                lfiber.sleep(0.01)
+            end
+            -- Renew replicasets object in case of reconfigure
+            -- and reload events.
+            if M.replicasets ~= old_replicasets then
+                break
+            end
             if not active_buckets then
                 log.error('Error during discovery %s: %s', replicaset, err)
             else
-- 
2.14.1





More information about the Tarantool-patches mailing list