[patches] [PATCH vshard 3/5] On reconfig reuse old connections if possible

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Mar 2 00:39:39 MSK 2018


Part of #76

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 test/router/router.result     | 63 +++++++++++++++++++++++++++++++++++++++++++
 test/router/router.test.lua   | 29 ++++++++++++++++++++
 test/storage/storage.result   | 60 +++++++++++++++++++++++++++++++++++++++++
 test/storage/storage.test.lua | 29 ++++++++++++++++++++
 vshard/replicaset.lua         | 37 ++++++++++++++++++++++---
 vshard/router/init.lua        | 21 ++++++++++-----
 vshard/storage/init.lua       |  5 +++-
 7 files changed, 233 insertions(+), 11 deletions(-)

diff --git a/test/router/router.result b/test/router/router.result
index fcf451f..c7b3437 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -92,6 +92,69 @@ rs2.replica == rs2.master
 - true
 ...
 --
+-- Part of gh-76: on reconfiguration do not recreate connections
+-- to replicas, that are kept in a new configuration.
+--
+old_replicasets = vshard.router.internal.replicasets
+---
+...
+old_connections = {}
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for _, old_rs in pairs(old_replicasets) do
+    for uuid, old_replica in pairs(old_rs.replicas) do
+        old_connections[uuid] = old_replica.conn
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+vshard.router.cfg(cfg)
+---
+...
+new_replicasets = vshard.router.internal.replicasets
+---
+...
+old_replicasets ~= new_replicasets
+---
+- true
+...
+rs1 = vshard.router.internal.replicasets[replicasets[1]]
+---
+...
+rs2 = vshard.router.internal.replicasets[replicasets[2]]
+---
+...
+while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end
+---
+...
+vshard.router.discovery_wakeup()
+---
+...
+-- Check that netbox connections are the same.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for _, new_rs in pairs(new_replicasets) do
+    for uuid, new_replica in pairs(new_rs.replicas) do
+        assert(old_connections[uuid] == new_replica.conn)
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+--
 -- bucket_id and bucket_count
 --
 util.check_error(vshard.router.bucket_id) -- invalid arguments
diff --git a/test/router/router.test.lua b/test/router/router.test.lua
index aedb015..5a51e2d 100644
--- a/test/router/router.test.lua
+++ b/test/router/router.test.lua
@@ -35,6 +35,35 @@ while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end
 rs1.replica == rs1.master
 rs2.replica == rs2.master
 
+--
+-- Part of gh-76: on reconfiguration do not recreate connections
+-- to replicas, that are kept in a new configuration.
+--
+old_replicasets = vshard.router.internal.replicasets
+old_connections = {}
+test_run:cmd("setopt delimiter ';'")
+for _, old_rs in pairs(old_replicasets) do
+    for uuid, old_replica in pairs(old_rs.replicas) do
+        old_connections[uuid] = old_replica.conn
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+vshard.router.cfg(cfg)
+new_replicasets = vshard.router.internal.replicasets
+old_replicasets ~= new_replicasets
+rs1 = vshard.router.internal.replicasets[replicasets[1]]
+rs2 = vshard.router.internal.replicasets[replicasets[2]]
+while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end
+vshard.router.discovery_wakeup()
+-- Check that netbox connections are the same.
+test_run:cmd("setopt delimiter ';'")
+for _, new_rs in pairs(new_replicasets) do
+    for uuid, new_replica in pairs(new_rs.replicas) do
+        assert(old_connections[uuid] == new_replica.conn)
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
 --
 -- bucket_id and bucket_count
 --
diff --git a/test/storage/storage.result b/test/storage/storage.result
index dfc7411..537c85b 100644
--- a/test/storage/storage.result
+++ b/test/storage/storage.result
@@ -639,6 +639,66 @@ vshard.storage.buckets_info()
     status: active
     id: 2
 ...
+--
+-- Part of gh-76: check that netbox old connections are reused on
+-- reconfiguration.
+--
+old_connections = {}
+---
+...
+connection_count = 0
+---
+...
+old_replicasets = vshard.storage.internal.replicasets
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for _, old_replicaset in pairs(old_replicasets) do
+	for uuid, old_replica in pairs(old_replicaset.replicas) do
+		old_connections[uuid] = old_replica.conn
+		if old_replica.conn then
+			connection_count = connection_count + 1
+		end
+	end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+connection_count > 0
+---
+- true
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+new_replicasets = vshard.storage.internal.replicasets
+---
+...
+new_replicasets ~= old_replicasets
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for _, new_replicaset in pairs(new_replicasets) do
+	for uuid, new_replica in pairs(new_replicaset.replicas) do
+		assert(old_connections[uuid] == new_replica.conn)
+	end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
 _ = test_run:cmd("switch default")
 ---
 ...
diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua
index 47d9150..59e36e8 100644
--- a/test/storage/storage.test.lua
+++ b/test/storage/storage.test.lua
@@ -143,6 +143,35 @@ vshard.storage.buckets_info()
 _ = test_run:cmd("switch storage_1_a")
 vshard.storage.buckets_info()
 
+--
+-- Part of gh-76: check that netbox old connections are reused on
+-- reconfiguration.
+--
+old_connections = {}
+connection_count = 0
+old_replicasets = vshard.storage.internal.replicasets
+test_run:cmd("setopt delimiter ';'")
+for _, old_replicaset in pairs(old_replicasets) do
+	for uuid, old_replica in pairs(old_replicaset.replicas) do
+		old_connections[uuid] = old_replica.conn
+		if old_replica.conn then
+			connection_count = connection_count + 1
+		end
+	end
+end;
+test_run:cmd("setopt delimiter ''");
+connection_count > 0
+vshard.storage.cfg(cfg, names.storage_1_a)
+new_replicasets = vshard.storage.internal.replicasets
+new_replicasets ~= old_replicasets
+test_run:cmd("setopt delimiter ';'")
+for _, new_replicaset in pairs(new_replicasets) do
+	for uuid, new_replica in pairs(new_replicaset.replicas) do
+		assert(old_connections[uuid] == new_replica.conn)
+	end
+end;
+test_run:cmd("setopt delimiter ''");
+
 _ = test_run:cmd("switch default")
 
 test_run:drop_cluster(REPLICASET_2)
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index f4bc701..84799df 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -7,7 +7,7 @@
 --             uri = string,
 --             name = string,
 --             uuid = string,
---             conn = <netbox>,
+--             conn = <netbox> + .replica + .replicaset,
 --             zone = number,
 --             next_by_priority = <replica object of the same type>,
 --             weight = number,
@@ -398,12 +398,36 @@ local function replicaset_tostring(replicaset)
                          master)
 end
 
+--
+-- Rebind connections of old replicas to new ones.
+--
+local function replicaset_rebind_connections(replicaset)
+    for _, replica in pairs(replicaset.replicas) do
+        local old_replica = replica.old_replica
+        if old_replica then
+            local conn = old_replica.conn
+            replica.conn = conn
+            replica.down_ts = old_replica.down_ts
+            replica.net_timeout = old_replica.net_timeout
+            replica.net_sequential_ok = old_replica.net_sequential_ok
+            replica.net_sequential_fail = old_replica.net_sequential_fail
+            if conn then
+                conn.replica = replica
+                conn.replicaset = replicaset
+                old_replica.conn = nil
+            end
+            replica.old_replica = nil
+        end
+    end
+end
+
 --
 -- Meta-methods
 --
 local replicaset_mt = {
     __index = {
         connect = replicaset_connect;
+        rebind_connections = replicaset_rebind_connections;
         update_candidate = replicaset_update_candidate;
         down_replica_priority = replicaset_down_replica_priority;
         set_candidate_as_replica = replicaset_set_candidate_as_replica;
@@ -469,7 +493,7 @@ end
 --
 -- Update/build replicasets from configuration
 --
-local function buildall(sharding_cfg)
+local function buildall(sharding_cfg, old_replicasets)
     local new_replicasets = {}
     local weights = sharding_cfg.weights
     local zone = sharding_cfg.zone
@@ -481,6 +505,8 @@ local function buildall(sharding_cfg)
     end
     local curr_ts = fiber.time()
     for replicaset_uuid, replicaset in pairs(sharding_cfg.sharding) do
+        local old_replicaset = old_replicasets and
+                               old_replicasets[replicaset_uuid]
         local new_replicaset = setmetatable({
             replicas = {},
             uuid = replicaset_uuid,
@@ -489,11 +515,16 @@ local function buildall(sharding_cfg)
         }, replicaset_mt)
         local priority_list = {}
         for replica_uuid, replica in pairs(replicaset.replicas) do
+            local old_replica = old_replicaset and
+                                old_replicaset.replicas[replica_uuid]
+            -- The old replica is saved in the new object to
+            -- rebind its connection at the end of a
+            -- router/storage reconfiguration.
             local new_replica = setmetatable({
                 uri = replica.uri, name = replica.name, uuid = replica_uuid,
                 zone = replica.zone, net_timeout = consts.CALL_TIMEOUT_MIN,
                 net_sequential_ok = 0, net_sequential_fail = 0,
-                down_ts = curr_ts,
+                down_ts = curr_ts, old_replica = old_replica,
             }, replica_mt)
             new_replicaset.replicas[replica_uuid] = new_replica
             if replica.master then
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index a956369..6814637 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -491,16 +491,10 @@ local function router_cfg(cfg)
     else
         log.info('Starting router reconfiguration')
     end
-    local new_replicasets = lreplicaset.buildall(cfg)
+    local new_replicasets = lreplicaset.buildall(cfg, old_replicasets)
     local total_bucket_count = cfg.bucket_count
     local collect_lua_garbage = cfg.collect_lua_garbage
     lcfg.remove_non_box_options(cfg)
-    -- Force net.box connection on cfg()
-    for _, replicaset in pairs(new_replicasets) do
-        replicaset:connect()
-        replicaset:update_candidate()
-    end
-    lreplicaset.wait_masters_connect(new_replicasets)
     log.info("Calling box.cfg()...")
     for k, v in pairs(cfg) do
         log.info({[k] = v})
@@ -512,6 +506,19 @@ local function router_cfg(cfg)
     -- TODO: update existing route map in-place
     M.route_map = {}
     M.replicasets = new_replicasets
+    -- Move connections from an old configuration to a new one.
+    -- It must be done with no yields to prevent usage both of not
+    -- fully moved old replicasets, and not fully built new ones.
+    for _, replicaset in pairs(new_replicasets) do
+        replicaset:rebind_connections()
+    end
+    -- Now the new replicasets are fully built. Can establish
+    -- connections and yield.
+    for _, replicaset in pairs(new_replicasets) do
+        replicaset:connect()
+        replicaset:update_candidate()
+    end
+    lreplicaset.wait_masters_connect(new_replicasets)
     if M.failover_fiber == nil then
         log.info('Start failover fiber')
         lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover')
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 408e559..a0f216e 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1298,7 +1298,7 @@ local function storage_cfg(cfg, this_replica_uuid)
 
     local this_replicaset
     local this_replica
-    local new_replicasets = lreplicaset.buildall(cfg)
+    local new_replicasets = lreplicaset.buildall(cfg, old_replicasets)
     local min_master
     for rs_uuid, rs in pairs(new_replicasets) do
         for replica_uuid, replica in pairs(rs.replicas) do
@@ -1356,6 +1356,9 @@ local function storage_cfg(cfg, this_replica_uuid)
     box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
 
     M.replicasets = new_replicasets
+    for _, replicaset in pairs(new_replicasets) do
+        replicaset:rebind_connections()
+    end
     M.this_replicaset = this_replicaset
     M.this_replica = this_replica
     M.total_bucket_count = total_bucket_count
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list