[tarantool-patches] [PATCH 1/3] Fix races related to object outdating
AKhatskevich
avkhatskevich at tarantool.org
Thu Jul 26 11:27:15 MSK 2018
Reload/reconfigure may replace many of M fields during any yield.
Old objects should not be accessed after they are outdated.
This commit handles such cases within `vshard.router`.
---
vshard/replicaset.lua | 30 ++++++++++++++-----------
vshard/router/init.lua | 58 +++++++++++++++++++++++++++++--------------------
vshard/storage/init.lua | 1 +
3 files changed, 52 insertions(+), 37 deletions(-)
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 6c8d477..87e26d3 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -340,16 +340,13 @@ local function replicaset_tostring(replicaset)
master)
end
-local outdate_replicasets
--
-- Copy netbox connections from old replica objects to new ones
-- and outdate old objects.
-- @param replicasets New replicasets
-- @param old_replicasets Replicasets and replicas to be outdated.
--- @param outdate_delay Number of seconds; delay to outdate
--- old objects.
--
-local function rebind_replicasets(replicasets, old_replicasets, outdate_delay)
+local function rebind_replicasets(replicasets, old_replicasets)
for replicaset_uuid, replicaset in pairs(replicasets) do
local old_replicaset = old_replicasets and
old_replicasets[replicaset_uuid]
@@ -370,9 +367,6 @@ local function rebind_replicasets(replicasets, old_replicasets, outdate_delay)
end
end
end
- if old_replicasets then
- util.async_task(outdate_delay, outdate_replicasets, old_replicasets)
- end
end
--
@@ -453,12 +447,7 @@ for fname, func in pairs(replica_mt.__index) do
outdated_replica_mt.__index[fname] = outdated_warning
end
---
--- Outdate replicaset and replica objects:
--- * Set outdated_metatables.
--- * Remove connections.
---
-outdate_replicasets = function(replicasets)
+local outdate_replicasets_internal = function(replicasets)
for _, replicaset in pairs(replicasets) do
setmetatable(replicaset, outdated_replicaset_mt)
for _, replica in pairs(replicaset.replicas) do
@@ -469,6 +458,20 @@ outdate_replicasets = function(replicasets)
log.info('Old replicaset and replica objects are outdated.')
end
+--
+-- Outdate replicaset and replica objects:
+-- * Set outdated_metatables.
+-- * Remove connections.
+-- @param replicasets Old replicasets to be outdated.
+-- @param outdate_delay Delay in seconds before the outdating.
+--
+local function outdate_replicasets(replicasets, outdate_delay)
+ if replicasets then
+ util.async_task(outdate_delay, outdate_replicasets_internal,
+ replicasets)
+ end
+end
+
--
-- Calculate for each replicaset its etalon bucket count.
-- Iterative algorithm is used to learn the best balance in a
@@ -650,4 +653,5 @@ return {
calculate_etalon_balance = cluster_calculate_etalon_balance,
wait_masters_connect = wait_masters_connect,
rebind_replicasets = rebind_replicasets,
+ outdate_replicasets = outdate_replicasets,
}
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 142ddb6..1a0ed2f 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -52,9 +52,14 @@ if not M then
}
end
--- Set a replicaset by container of a bucket.
-local function bucket_set(bucket_id, replicaset)
- assert(replicaset)
+-- Set a bucket to a replicaset.
+local function bucket_set(bucket_id, rs_uuid)
+ local replicaset = M.replicasets[rs_uuid]
+ -- It is technically possible to delete a replicaset at the
+ -- same time when route to the bucket is discovered.
+ if not replicaset then
+ return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, bucket_id)
+ end
local old_replicaset = M.route_map[bucket_id]
if old_replicaset ~= replicaset then
if old_replicaset then
@@ -63,6 +68,7 @@ local function bucket_set(bucket_id, replicaset)
replicaset.bucket_count = replicaset.bucket_count + 1
end
M.route_map[bucket_id] = replicaset
+ return replicaset
end
-- Remove a bucket from the cache.
@@ -88,15 +94,18 @@ local function bucket_discovery(bucket_id)
log.verbose("Discovering bucket %d", bucket_id)
local last_err = nil
local unreachable_uuid = nil
- for uuid, replicaset in pairs(M.replicasets) do
- local _, err =
- replicaset:callrw('vshard.storage.bucket_stat', {bucket_id})
- if err == nil then
- bucket_set(bucket_id, replicaset)
- return replicaset
- elseif err.code ~= lerror.code.WRONG_BUCKET then
- last_err = err
- unreachable_uuid = uuid
+ for uuid, _ in pairs(M.replicasets) do
+ -- Handle reload/reconfigure.
+ replicaset = M.replicasets[uuid]
+ if replicaset then
+ local _, err =
+ replicaset:callrw('vshard.storage.bucket_stat', {bucket_id})
+ if err == nil then
+ return bucket_set(bucket_id, replicaset.uuid)
+ elseif err.code ~= lerror.code.WRONG_BUCKET then
+ last_err = err
+ unreachable_uuid = uuid
+ end
end
end
local err = nil
@@ -262,13 +271,13 @@ local function router_call(bucket_id, mode, func, args, opts)
end
end
else
- bucket_set(bucket_id, replicaset)
+ replicaset = bucket_set(bucket_id, replicaset.uuid)
lfiber.yield()
-- Protect against infinite cycle in a
-- case of broken cluster, when a bucket
-- is sent on two replicasets to each
-- other.
- if lfiber.time() <= tend then
+ if replicaset and lfiber.time() <= tend then
goto replicaset_is_found
end
end
@@ -513,27 +522,28 @@ local function router_cfg(cfg)
end
box.cfg(box_cfg)
log.info("Box has been configured")
- M.connection_outdate_delay = cfg.connection_outdate_delay
- M.total_bucket_count = total_bucket_count
- M.collect_lua_garbage = collect_lua_garbage
- M.current_cfg = new_cfg
-- Move connections from an old configuration to a new one.
-- It must be done with no yields to prevent usage both of not
-- fully moved old replicasets, and not fully built new ones.
- lreplicaset.rebind_replicasets(new_replicasets, M.replicasets,
- M.connection_outdate_delay)
- M.replicasets = new_replicasets
+ lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
-- Now the new replicasets are fully built. Can establish
-- connections and yield.
for _, replicaset in pairs(new_replicasets) do
replicaset:connect_all()
end
+ lreplicaset.wait_masters_connect(new_replicasets)
+ lreplicaset.outdate_replicasets(M.replicasets, cfg.connection_outdate_delay)
+ M.connection_outdate_delay = cfg.connection_outdate_delay
+ M.total_bucket_count = total_bucket_count
+ M.collect_lua_garbage = collect_lua_garbage
+ M.current_cfg = cfg
+ M.replicasets = new_replicasets
-- Update existing route map in-place.
- for bucket, rs in pairs(M.route_map) do
+ local old_route_map = M.route_map
+ M.route_map = {}
+ for bucket, rs in pairs(old_route_map) do
M.route_map[bucket] = M.replicasets[rs.uuid]
end
-
- lreplicaset.wait_masters_connect(new_replicasets)
if M.failover_fiber == nil then
lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover')
end
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 07bd00c..c1df0e6 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1620,6 +1620,7 @@ local function storage_cfg(cfg, this_replica_uuid)
box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
+ lreplicaset.outdate_replicasets(M.replicasets)
M.replicasets = new_replicasets
M.this_replicaset = this_replicaset
M.this_replica = this_replica
--
2.14.1
More information about the Tarantool-patches
mailing list