From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id B203D273F2 for ; Thu, 26 Jul 2018 04:27:41 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id nPN9ptEcEQXd for ; Thu, 26 Jul 2018 04:27:41 -0400 (EDT) Received: from smtp38.i.mail.ru (smtp38.i.mail.ru [94.100.177.98]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id E7B17273DD for ; Thu, 26 Jul 2018 04:27:40 -0400 (EDT) From: AKhatskevich Subject: [tarantool-patches] [PATCH 1/3] Fix races related to object outdating Date: Thu, 26 Jul 2018 11:27:15 +0300 Message-Id: <18f2ede05fa4a77bf0bd2abb64c25df0e3c574d6.1532593430.git.avkhatskevich@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: v.shpilevoy@tarantool.org, tarantool-patches@freelists.org Reload/reconfigure may replace many of M fields during any yield. Old objects should not be accessed after they are outdated. This commit handles such cases within `vshard.router`. --- vshard/replicaset.lua | 30 ++++++++++++++----------- vshard/router/init.lua | 58 +++++++++++++++++++++++++++++-------------------- vshard/storage/init.lua | 1 + 3 files changed, 52 insertions(+), 37 deletions(-) diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 6c8d477..87e26d3 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -340,16 +340,13 @@ local function replicaset_tostring(replicaset) master) end -local outdate_replicasets -- -- Copy netbox connections from old replica objects to new ones -- and outdate old objects. -- @param replicasets New replicasets -- @param old_replicasets Replicasets and replicas to be outdated. --- @param outdate_delay Number of seconds; delay to outdate --- old objects. -- -local function rebind_replicasets(replicasets, old_replicasets, outdate_delay) +local function rebind_replicasets(replicasets, old_replicasets) for replicaset_uuid, replicaset in pairs(replicasets) do local old_replicaset = old_replicasets and old_replicasets[replicaset_uuid] @@ -370,9 +367,6 @@ local function rebind_replicasets(replicasets, old_replicasets, outdate_delay) end end end - if old_replicasets then - util.async_task(outdate_delay, outdate_replicasets, old_replicasets) - end end -- @@ -453,12 +447,7 @@ for fname, func in pairs(replica_mt.__index) do outdated_replica_mt.__index[fname] = outdated_warning end --- --- Outdate replicaset and replica objects: --- * Set outdated_metatables. --- * Remove connections. --- -outdate_replicasets = function(replicasets) +local outdate_replicasets_internal = function(replicasets) for _, replicaset in pairs(replicasets) do setmetatable(replicaset, outdated_replicaset_mt) for _, replica in pairs(replicaset.replicas) do @@ -469,6 +458,20 @@ outdate_replicasets = function(replicasets) log.info('Old replicaset and replica objects are outdated.') end +-- +-- Outdate replicaset and replica objects: +-- * Set outdated_metatables. +-- * Remove connections. +-- @param replicasets Old replicasets to be outdated. +-- @param outdate_delay Delay in seconds before the outdating. +-- +local function outdate_replicasets(replicasets, outdate_delay) + if replicasets then + util.async_task(outdate_delay, outdate_replicasets_internal, + replicasets) + end +end + -- -- Calculate for each replicaset its etalon bucket count. -- Iterative algorithm is used to learn the best balance in a @@ -650,4 +653,5 @@ return { calculate_etalon_balance = cluster_calculate_etalon_balance, wait_masters_connect = wait_masters_connect, rebind_replicasets = rebind_replicasets, + outdate_replicasets = outdate_replicasets, } diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 142ddb6..1a0ed2f 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -52,9 +52,14 @@ if not M then } end --- Set a replicaset by container of a bucket. -local function bucket_set(bucket_id, replicaset) - assert(replicaset) +-- Set a bucket to a replicaset. +local function bucket_set(bucket_id, rs_uuid) + local replicaset = M.replicasets[rs_uuid] + -- It is technically possible to delete a replicaset at the + -- same time when route to the bucket is discovered. + if not replicaset then + return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, bucket_id) + end local old_replicaset = M.route_map[bucket_id] if old_replicaset ~= replicaset then if old_replicaset then @@ -63,6 +68,7 @@ local function bucket_set(bucket_id, replicaset) replicaset.bucket_count = replicaset.bucket_count + 1 end M.route_map[bucket_id] = replicaset + return replicaset end -- Remove a bucket from the cache. @@ -88,15 +94,18 @@ local function bucket_discovery(bucket_id) log.verbose("Discovering bucket %d", bucket_id) local last_err = nil local unreachable_uuid = nil - for uuid, replicaset in pairs(M.replicasets) do - local _, err = - replicaset:callrw('vshard.storage.bucket_stat', {bucket_id}) - if err == nil then - bucket_set(bucket_id, replicaset) - return replicaset - elseif err.code ~= lerror.code.WRONG_BUCKET then - last_err = err - unreachable_uuid = uuid + for uuid, _ in pairs(M.replicasets) do + -- Handle reload/reconfigure. + replicaset = M.replicasets[uuid] + if replicaset then + local _, err = + replicaset:callrw('vshard.storage.bucket_stat', {bucket_id}) + if err == nil then + return bucket_set(bucket_id, replicaset.uuid) + elseif err.code ~= lerror.code.WRONG_BUCKET then + last_err = err + unreachable_uuid = uuid + end end end local err = nil @@ -262,13 +271,13 @@ local function router_call(bucket_id, mode, func, args, opts) end end else - bucket_set(bucket_id, replicaset) + replicaset = bucket_set(bucket_id, replicaset.uuid) lfiber.yield() -- Protect against infinite cycle in a -- case of broken cluster, when a bucket -- is sent on two replicasets to each -- other. - if lfiber.time() <= tend then + if replicaset and lfiber.time() <= tend then goto replicaset_is_found end end @@ -513,27 +522,28 @@ local function router_cfg(cfg) end box.cfg(box_cfg) log.info("Box has been configured") - M.connection_outdate_delay = cfg.connection_outdate_delay - M.total_bucket_count = total_bucket_count - M.collect_lua_garbage = collect_lua_garbage - M.current_cfg = new_cfg -- Move connections from an old configuration to a new one. -- It must be done with no yields to prevent usage both of not -- fully moved old replicasets, and not fully built new ones. - lreplicaset.rebind_replicasets(new_replicasets, M.replicasets, - M.connection_outdate_delay) - M.replicasets = new_replicasets + lreplicaset.rebind_replicasets(new_replicasets, M.replicasets) -- Now the new replicasets are fully built. Can establish -- connections and yield. for _, replicaset in pairs(new_replicasets) do replicaset:connect_all() end + lreplicaset.wait_masters_connect(new_replicasets) + lreplicaset.outdate_replicasets(M.replicasets, cfg.connection_outdate_delay) + M.connection_outdate_delay = cfg.connection_outdate_delay + M.total_bucket_count = total_bucket_count + M.collect_lua_garbage = collect_lua_garbage + M.current_cfg = cfg + M.replicasets = new_replicasets -- Update existing route map in-place. - for bucket, rs in pairs(M.route_map) do + local old_route_map = M.route_map + M.route_map = {} + for bucket, rs in pairs(old_route_map) do M.route_map[bucket] = M.replicasets[rs.uuid] end - - lreplicaset.wait_masters_connect(new_replicasets) if M.failover_fiber == nil then lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') end diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 07bd00c..c1df0e6 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -1620,6 +1620,7 @@ local function storage_cfg(cfg, this_replica_uuid) box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password) lreplicaset.rebind_replicasets(new_replicasets, M.replicasets) + lreplicaset.outdate_replicasets(M.replicasets) M.replicasets = new_replicasets M.this_replicaset = this_replicaset M.this_replica = this_replica -- 2.14.1