[tarantool-patches] [PATCH 1/3] Fix races related to object outdating

AKhatskevich avkhatskevich at tarantool.org
Thu Jul 26 11:27:15 MSK 2018


Reload/reconfigure may replace many of M fields during any yield.
Old objects should not be accessed after they are outdated.

This commit handles such cases within `vshard.router`.
---
 vshard/replicaset.lua   | 30 ++++++++++++++-----------
 vshard/router/init.lua  | 58 +++++++++++++++++++++++++++++--------------------
 vshard/storage/init.lua |  1 +
 3 files changed, 52 insertions(+), 37 deletions(-)

diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 6c8d477..87e26d3 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -340,16 +340,13 @@ local function replicaset_tostring(replicaset)
                          master)
 end
 
-local outdate_replicasets
 --
 -- Copy netbox connections from old replica objects to new ones
 -- and outdate old objects.
 -- @param replicasets New replicasets
 -- @param old_replicasets Replicasets and replicas to be outdated.
--- @param outdate_delay Number of seconds; delay to outdate
---        old objects.
 --
-local function rebind_replicasets(replicasets, old_replicasets, outdate_delay)
+local function rebind_replicasets(replicasets, old_replicasets)
     for replicaset_uuid, replicaset in pairs(replicasets) do
         local old_replicaset = old_replicasets and
                                old_replicasets[replicaset_uuid]
@@ -370,9 +367,6 @@ local function rebind_replicasets(replicasets, old_replicasets, outdate_delay)
             end
         end
     end
-    if old_replicasets then
-        util.async_task(outdate_delay, outdate_replicasets, old_replicasets)
-    end
 end
 
 --
@@ -453,12 +447,7 @@ for fname, func in pairs(replica_mt.__index) do
     outdated_replica_mt.__index[fname] = outdated_warning
 end
 
---
--- Outdate replicaset and replica objects:
---  * Set outdated_metatables.
---  * Remove connections.
---
-outdate_replicasets = function(replicasets)
+local outdate_replicasets_internal = function(replicasets)
     for _, replicaset in pairs(replicasets) do
         setmetatable(replicaset, outdated_replicaset_mt)
         for _, replica in pairs(replicaset.replicas) do
@@ -469,6 +458,20 @@ outdate_replicasets = function(replicasets)
     log.info('Old replicaset and replica objects are outdated.')
 end
 
+--
+-- Outdate replicaset and replica objects:
+--  * Set outdated_metatables.
+--  * Remove connections.
+-- @param replicasets Old replicasets to be outdated.
+-- @param outdate_delay Delay in seconds before the outdating.
+--
+local function outdate_replicasets(replicasets, outdate_delay)
+    if replicasets then
+        util.async_task(outdate_delay, outdate_replicasets_internal,
+                        replicasets)
+    end
+end
+
 --
 -- Calculate for each replicaset its etalon bucket count.
 -- Iterative algorithm is used to learn the best balance in a
@@ -650,4 +653,5 @@ return {
     calculate_etalon_balance = cluster_calculate_etalon_balance,
     wait_masters_connect = wait_masters_connect,
     rebind_replicasets = rebind_replicasets,
+    outdate_replicasets = outdate_replicasets,
 }
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 142ddb6..1a0ed2f 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -52,9 +52,14 @@ if not M then
     }
 end
 
--- Set a replicaset by container of a bucket.
-local function bucket_set(bucket_id, replicaset)
-    assert(replicaset)
+-- Set a bucket to a replicaset.
+local function bucket_set(bucket_id, rs_uuid)
+    local replicaset = M.replicasets[rs_uuid]
+    -- It is technically possible to delete a replicaset at the
+    -- same time when route to the bucket is discovered.
+    if not replicaset then
+        return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, bucket_id)
+    end
     local old_replicaset = M.route_map[bucket_id]
     if old_replicaset ~= replicaset then
         if old_replicaset then
@@ -63,6 +68,7 @@ local function bucket_set(bucket_id, replicaset)
         replicaset.bucket_count = replicaset.bucket_count + 1
     end
     M.route_map[bucket_id] = replicaset
+    return replicaset
 end
 
 -- Remove a bucket from the cache.
@@ -88,15 +94,18 @@ local function bucket_discovery(bucket_id)
     log.verbose("Discovering bucket %d", bucket_id)
     local last_err = nil
     local unreachable_uuid = nil
-    for uuid, replicaset in pairs(M.replicasets) do
-        local _, err =
-            replicaset:callrw('vshard.storage.bucket_stat', {bucket_id})
-        if err == nil then
-            bucket_set(bucket_id, replicaset)
-            return replicaset
-        elseif err.code ~= lerror.code.WRONG_BUCKET then
-            last_err = err
-            unreachable_uuid = uuid
+    for uuid, _ in pairs(M.replicasets) do
+        -- Handle reload/reconfigure.
+        replicaset = M.replicasets[uuid]
+        if replicaset then
+            local _, err =
+                replicaset:callrw('vshard.storage.bucket_stat', {bucket_id})
+            if err == nil then
+                return bucket_set(bucket_id, replicaset.uuid)
+            elseif err.code ~= lerror.code.WRONG_BUCKET then
+                last_err = err
+                unreachable_uuid = uuid
+            end
         end
     end
     local err = nil
@@ -262,13 +271,13 @@ local function router_call(bucket_id, mode, func, args, opts)
                             end
                         end
                     else
-                        bucket_set(bucket_id, replicaset)
+                        replicaset = bucket_set(bucket_id, replicaset.uuid)
                         lfiber.yield()
                         -- Protect against infinite cycle in a
                         -- case of broken cluster, when a bucket
                         -- is sent on two replicasets to each
                         -- other.
-                        if lfiber.time() <= tend then
+                        if replicaset and lfiber.time() <= tend then
                             goto replicaset_is_found
                         end
                     end
@@ -513,27 +522,28 @@ local function router_cfg(cfg)
     end
     box.cfg(box_cfg)
     log.info("Box has been configured")
-    M.connection_outdate_delay = cfg.connection_outdate_delay
-    M.total_bucket_count = total_bucket_count
-    M.collect_lua_garbage = collect_lua_garbage
-    M.current_cfg = new_cfg
     -- Move connections from an old configuration to a new one.
     -- It must be done with no yields to prevent usage both of not
     -- fully moved old replicasets, and not fully built new ones.
-    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets,
-                                   M.connection_outdate_delay)
-    M.replicasets = new_replicasets
+    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
     -- Now the new replicasets are fully built. Can establish
     -- connections and yield.
     for _, replicaset in pairs(new_replicasets) do
         replicaset:connect_all()
     end
+    lreplicaset.wait_masters_connect(new_replicasets)
+    lreplicaset.outdate_replicasets(M.replicasets, cfg.connection_outdate_delay)
+    M.connection_outdate_delay = cfg.connection_outdate_delay
+    M.total_bucket_count = total_bucket_count
+    M.collect_lua_garbage = collect_lua_garbage
+    M.current_cfg = cfg
+    M.replicasets = new_replicasets
     -- Update existing route map in-place.
-    for bucket, rs in pairs(M.route_map) do
+    local old_route_map = M.route_map
+    M.route_map = {}
+    for bucket, rs in pairs(old_route_map) do
         M.route_map[bucket] = M.replicasets[rs.uuid]
     end
-
-    lreplicaset.wait_masters_connect(new_replicasets)
     if M.failover_fiber == nil then
         lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover')
     end
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 07bd00c..c1df0e6 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1620,6 +1620,7 @@ local function storage_cfg(cfg, this_replica_uuid)
     box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
 
     lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
+    lreplicaset.outdate_replicasets(M.replicasets)
     M.replicasets = new_replicasets
     M.this_replicaset = this_replicaset
     M.this_replica = this_replica
-- 
2.14.1





More information about the Tarantool-patches mailing list