[Tarantool-patches] [PATCH vshard 5/6] router: introduce automatic master discovery

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Jul 2 01:09:35 MSK 2021


Part of #75

@TarantoolBot document
Title: VShard router master discovery

Router used not to be able to find master nodes in the configured
replicasets on its own. It relied only on how they were specified
in the config.

This becomes a problem when master changes and the change is not
delivered to the router's config somewhy. For instance, the router
does not rely on a central config provider. Or it does rely, but
the provider can't deliver a new config due to any reason.

This is getting especially tricky with built-in automatic master
elections which are not supported by vshard yet, but they will be,
they won't depend on any config. Master role won't be pinned to
one node then.

Now there is a new feature to overcome the master search problem -
configurable automatic master discovery on the router.

Router goes to the replicasets, marked as having an auto master,
finds a master in them, and periodically checks if the master is
still a master.

When a master in a replicaset stops being a master, the router
walks all nodes of the replicaset and finds who is the new master.

To turn the feature on there is a new option in router's config:
`master = 'auto'`. It should be specified per-replicaset, and is
not compatible with specifying a master manually.

This is how a good config looks:
```
config = {
    sharding = {
        <replicaset uuid> = {
            master = 'auto',
            replicas = {...},
        },
        ...
    },
    ...
}
```

This is how a bad config looks:
```
config = {
    sharding = {
        <replicaset uuid> = {
            master = 'auto',
            replicas = {
                <replica uuid1> = {
                    master = true,
                    ...
                },
                <replica uuid2> = {
                    master = false,
                    ...
                },
            },
        },
        ...
    },
    ...
}
```
It will not work, because either `master = 'auto'` can be
specified, or the master is assigned manually. Not both at the
same time.
---
 test/router/master_discovery.result   | 514 ++++++++++++++++++++++++++
 test/router/master_discovery.test.lua | 248 +++++++++++++
 vshard/consts.lua                     |   5 +
 vshard/error.lua                      |   5 +
 vshard/replicaset.lua                 | 203 +++++++++-
 vshard/router/init.lua                |  98 ++++-
 6 files changed, 1051 insertions(+), 22 deletions(-)
 create mode 100644 test/router/master_discovery.result
 create mode 100644 test/router/master_discovery.test.lua

diff --git a/test/router/master_discovery.result b/test/router/master_discovery.result
new file mode 100644
index 0000000..2fca1e4
--- /dev/null
+++ b/test/router/master_discovery.result
@@ -0,0 +1,514 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+ | ---
+ | ...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+ | ---
+ | ...
+test_run:create_cluster(REPLICASET_1, 'router')
+ | ---
+ | ...
+test_run:create_cluster(REPLICASET_2, 'router')
+ | ---
+ | ...
+util = require('util')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+ | ---
+ | ...
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')')
+ | ---
+ | ...
+util.push_rs_filters(test_run)
+ | ---
+ | ...
+_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
+ | ---
+ | ...
+_ = test_run:cmd("start server router_1")
+ | ---
+ | ...
+
+--
+-- gh-75: automatic master discovery on router.
+--
+
+_ = test_run:switch("router_1")
+ | ---
+ | ...
+util = require('util')
+ | ---
+ | ...
+vshard.router.bootstrap()
+ | ---
+ | - true
+ | ...
+
+for _, rs in pairs(cfg.sharding) do                                             \
+    for _, r in pairs(rs.replicas) do                                           \
+        r.master = nil                                                          \
+    end                                                                         \
+end                                                                             \
+
+function enable_auto_masters()                                                  \
+    for _, rs in pairs(cfg.sharding) do                                         \
+        rs.master = 'auto'                                                      \
+    end                                                                         \
+    vshard.router.cfg(cfg)                                                      \
+end
+ | ---
+ | ...
+
+function disable_auto_masters()                                                 \
+    for _, rs in pairs(cfg.sharding) do                                         \
+        rs.master = nil                                                         \
+    end                                                                         \
+    vshard.router.cfg(cfg)                                                      \
+end
+ | ---
+ | ...
+
+-- But do not forget the buckets. Otherwise bucket discovery will establish
+-- the connections instead of external requests.
+function forget_masters()                                                       \
+    disable_auto_masters()                                                      \
+    enable_auto_masters()                                                       \
+end
+ | ---
+ | ...
+
+function check_all_masters_found()                                              \
+    for _, rs in pairs(vshard.router.static.replicasets) do                     \
+        if not rs.master then                                                   \
+            vshard.router.static.master_search_fiber:wakeup()                   \
+            return false                                                        \
+        end                                                                     \
+    end                                                                         \
+    return true                                                                 \
+end
+ | ---
+ | ...
+
+function check_master_for_replicaset(rs_id, master_name)                        \
+    local rs_uuid = util.replicasets[rs_id]                                     \
+    local master_uuid = util.name_to_uuid[master_name]                          \
+    local master = vshard.router.static.replicasets[rs_uuid].master             \
+    if not master or master.uuid ~= master_uuid then                            \
+        vshard.router.static.master_search_fiber:wakeup()                       \
+        return false                                                            \
+    end                                                                         \
+    return true                                                                 \
+end
+ | ---
+ | ...
+
+function check_all_buckets_found()                                              \
+    if vshard.router.info().bucket.unknown == 0 then                            \
+        return true                                                             \
+    end                                                                         \
+    vshard.router.discovery_wakeup()                                            \
+    return false                                                                \
+end
+ | ---
+ | ...
+
+master_search_helper_f = nil
+ | ---
+ | ...
+function aggressive_master_search_f()                                           \
+    while true do                                                               \
+        vshard.router.static.master_search_fiber:wakeup()                       \
+        fiber.sleep(0.001)                                                      \
+    end                                                                         \
+end
+ | ---
+ | ...
+
+function start_aggressive_master_search()                                       \
+    assert(master_search_helper_f == nil)                                       \
+    master_search_helper_f = fiber.new(aggressive_master_search_f)              \
+    master_search_helper_f:set_joinable(true)                                   \
+end
+ | ---
+ | ...
+
+function stop_aggressive_master_search()                                        \
+    assert(master_search_helper_f ~= nil)                                       \
+    master_search_helper_f:cancel()                                             \
+    master_search_helper_f:join()                                               \
+    master_search_helper_f = nil                                                \
+end
+ | ---
+ | ...
+
+--
+-- Simulate the first cfg when no masters are known.
+--
+forget_masters()
+ | ---
+ | ...
+assert(vshard.router.static.master_search_fiber ~= nil)
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(check_all_masters_found)
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(check_all_buckets_found)
+ | ---
+ | - true
+ | ...
+
+--
+-- Change master and see how router finds it again.
+--
+test_run:switch('storage_1_a')
+ | ---
+ | - true
+ | ...
+replicas = cfg.sharding[util.replicasets[1]].replicas
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_a].master = false
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_b].master = true
+ | ---
+ | ...
+vshard.storage.cfg(cfg, instance_uuid)
+ | ---
+ | ...
+
+test_run:switch('storage_1_b')
+ | ---
+ | - true
+ | ...
+replicas = cfg.sharding[util.replicasets[1]].replicas
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_a].master = false
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_b].master = true
+ | ---
+ | ...
+vshard.storage.cfg(cfg, instance_uuid)
+ | ---
+ | ...
+
+test_run:switch('router_1')
+ | ---
+ | - true
+ | ...
+big_timeout = 1000000
+ | ---
+ | ...
+opts_big_timeout = {timeout = big_timeout}
+ | ---
+ | ...
+test_run:wait_cond(function()                                                   \
+    return check_master_for_replicaset(1, 'storage_1_b')                        \
+end)
+ | ---
+ | - true
+ | ...
+vshard.router.callrw(1501, 'echo', {1}, opts_big_timeout)
+ | ---
+ | - 1
+ | ...
+
+test_run:switch('storage_1_b')
+ | ---
+ | - true
+ | ...
+assert(echo_count == 1)
+ | ---
+ | - true
+ | ...
+echo_count = 0
+ | ---
+ | ...
+
+--
+-- Revert the master back.
+--
+test_run:switch('storage_1_a')
+ | ---
+ | - true
+ | ...
+replicas = cfg.sharding[util.replicasets[1]].replicas
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_a].master = true
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_b].master = false
+ | ---
+ | ...
+vshard.storage.cfg(cfg, instance_uuid)
+ | ---
+ | ...
+
+test_run:switch('storage_1_b')
+ | ---
+ | - true
+ | ...
+replicas = cfg.sharding[util.replicasets[1]].replicas
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_a].master = true
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_b].master = false
+ | ---
+ | ...
+vshard.storage.cfg(cfg, instance_uuid)
+ | ---
+ | ...
+
+test_run:switch('router_1')
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function()                                                   \
+    return check_master_for_replicaset(1, 'storage_1_a')                        \
+end)
+ | ---
+ | - true
+ | ...
+
+--
+-- Call tries to wait for master if has enough time left.
+--
+start_aggressive_master_search()
+ | ---
+ | ...
+
+forget_masters()
+ | ---
+ | ...
+vshard.router.callrw(1501, 'echo', {1}, opts_big_timeout)
+ | ---
+ | - 1
+ | ...
+
+forget_masters()
+ | ---
+ | ...
+-- XXX: this should not depend on master so much. RO requests should be able to
+-- go to replicas.
+vshard.router.callro(1501, 'echo', {1}, opts_big_timeout)
+ | ---
+ | - 1
+ | ...
+
+forget_masters()
+ | ---
+ | ...
+vshard.router.route(1501):callrw('echo', {1}, opts_big_timeout)
+ | ---
+ | - 1
+ | - null
+ | - null
+ | ...
+
+forget_masters()
+ | ---
+ | ...
+-- XXX: the same as above - should not really wait for master. Regardless of it
+-- being auto or not.
+vshard.router.route(1501):callro('echo', {1}, opts_big_timeout)
+ | ---
+ | - 1
+ | - null
+ | - null
+ | ...
+
+stop_aggressive_master_search()
+ | ---
+ | ...
+
+test_run:switch('storage_1_a')
+ | ---
+ | - true
+ | ...
+assert(echo_count == 4)
+ | ---
+ | - true
+ | ...
+echo_count = 0
+ | ---
+ | ...
+
+--
+-- Old replicaset objects stop waiting for master when search is disabled.
+--
+
+-- Turn off masters on the first replicaset.
+replicas = cfg.sharding[util.replicasets[1]].replicas
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_a].master = false
+ | ---
+ | ...
+vshard.storage.cfg(cfg, instance_uuid)
+ | ---
+ | ...
+
+test_run:switch('storage_1_b')
+ | ---
+ | - true
+ | ...
+replicas = cfg.sharding[util.replicasets[1]].replicas
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_a].master = false
+ | ---
+ | ...
+vshard.storage.cfg(cfg, instance_uuid)
+ | ---
+ | ...
+
+-- Try to make RW and RO requests but then turn of the auto search.
+test_run:switch('router_1')
+ | ---
+ | - true
+ | ...
+forget_masters()
+ | ---
+ | ...
+f1 = fiber.create(function()                                                    \
+    fiber.self():set_joinable(true)                                             \
+    return vshard.router.callrw(1501, 'echo', {1}, opts_big_timeout)            \
+end)
+ | ---
+ | ...
+-- XXX: should not really wait for master since this is an RO request. It could
+-- use a replica.
+f2 = fiber.create(function()                                                    \
+    fiber.self():set_joinable(true)                                             \
+    return vshard.router.callro(1501, 'echo', {1}, opts_big_timeout)            \
+end)
+ | ---
+ | ...
+fiber.sleep(0.01)
+ | ---
+ | ...
+disable_auto_masters()
+ | ---
+ | ...
+f1:join()
+ | ---
+ | - true
+ | - null
+ | - code: 6
+ |   type: ShardingError
+ |   name: MISSING_MASTER
+ |   replicaset_uuid: <replicaset_1>
+ |   message: Master is not configured for replicaset <replicaset_1>
+ | ...
+f2:join()
+ | ---
+ | - true
+ | - null
+ | - code: 6
+ |   type: ShardingError
+ |   name: MISSING_MASTER
+ |   replicaset_uuid: <replicaset_1>
+ |   message: Master is not configured for replicaset <replicaset_1>
+ | ...
+
+--
+-- Multiple masters logging.
+--
+test_run:switch('storage_1_a')
+ | ---
+ | - true
+ | ...
+replicas = cfg.sharding[util.replicasets[1]].replicas
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_a].master = true
+ | ---
+ | ...
+vshard.storage.cfg(cfg, instance_uuid)
+ | ---
+ | ...
+
+test_run:switch('storage_1_b')
+ | ---
+ | - true
+ | ...
+replicas = cfg.sharding[util.replicasets[1]].replicas
+ | ---
+ | ...
+replicas[util.name_to_uuid.storage_1_b].master = true
+ | ---
+ | ...
+vshard.storage.cfg(cfg, instance_uuid)
+ | ---
+ | ...
+
+test_run:switch('router_1')
+ | ---
+ | - true
+ | ...
+forget_masters()
+ | ---
+ | ...
+start_aggressive_master_search()
+ | ---
+ | ...
+test_run:wait_log('router_1', 'Found more than one master', nil, 10)
+ | ---
+ | - Found more than one master
+ | ...
+stop_aggressive_master_search()
+ | ---
+ | ...
+
+--
+-- Async request won't wait for master. Otherwise it would need to wait, which
+-- is not async behaviour. The timeout should be ignored.
+--
+do                                                                              \
+    forget_masters()                                                            \
+    return vshard.router.callrw(1501, 'echo', {1}, {                            \
+        is_async = true, timeout = big_timeout                                  \
+    })                                                                          \
+end
+ | ---
+ | - null
+ | - code: 6
+ |   type: ShardingError
+ |   name: MISSING_MASTER
+ |   replicaset_uuid: <replicaset_1>
+ |   message: Master is not configured for replicaset <replicaset_1>
+ | ...
+
+_ = test_run:switch("default")
+ | ---
+ | ...
+_ = test_run:cmd("stop server router_1")
+ | ---
+ | ...
+_ = test_run:cmd("cleanup server router_1")
+ | ---
+ | ...
+test_run:drop_cluster(REPLICASET_1)
+ | ---
+ | ...
+test_run:drop_cluster(REPLICASET_2)
+ | ---
+ | ...
+_ = test_run:cmd('clear filter')
+ | ---
+ | ...
diff --git a/test/router/master_discovery.test.lua b/test/router/master_discovery.test.lua
new file mode 100644
index 0000000..e087f58
--- /dev/null
+++ b/test/router/master_discovery.test.lua
@@ -0,0 +1,248 @@
+test_run = require('test_run').new()
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+test_run:create_cluster(REPLICASET_1, 'router')
+test_run:create_cluster(REPLICASET_2, 'router')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')')
+util.push_rs_filters(test_run)
+_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
+_ = test_run:cmd("start server router_1")
+
+--
+-- gh-75: automatic master discovery on router.
+--
+
+_ = test_run:switch("router_1")
+util = require('util')
+vshard.router.bootstrap()
+
+for _, rs in pairs(cfg.sharding) do                                             \
+    for _, r in pairs(rs.replicas) do                                           \
+        r.master = nil                                                          \
+    end                                                                         \
+end                                                                             \
+
+function enable_auto_masters()                                                  \
+    for _, rs in pairs(cfg.sharding) do                                         \
+        rs.master = 'auto'                                                      \
+    end                                                                         \
+    vshard.router.cfg(cfg)                                                      \
+end
+
+function disable_auto_masters()                                                 \
+    for _, rs in pairs(cfg.sharding) do                                         \
+        rs.master = nil                                                         \
+    end                                                                         \
+    vshard.router.cfg(cfg)                                                      \
+end
+
+-- But do not forget the buckets. Otherwise bucket discovery will establish
+-- the connections instead of external requests.
+function forget_masters()                                                       \
+    disable_auto_masters()                                                      \
+    enable_auto_masters()                                                       \
+end
+
+function check_all_masters_found()                                              \
+    for _, rs in pairs(vshard.router.static.replicasets) do                     \
+        if not rs.master then                                                   \
+            vshard.router.static.master_search_fiber:wakeup()                   \
+            return false                                                        \
+        end                                                                     \
+    end                                                                         \
+    return true                                                                 \
+end
+
+function check_master_for_replicaset(rs_id, master_name)                        \
+    local rs_uuid = util.replicasets[rs_id]                                     \
+    local master_uuid = util.name_to_uuid[master_name]                          \
+    local master = vshard.router.static.replicasets[rs_uuid].master             \
+    if not master or master.uuid ~= master_uuid then                            \
+        vshard.router.static.master_search_fiber:wakeup()                       \
+        return false                                                            \
+    end                                                                         \
+    return true                                                                 \
+end
+
+function check_all_buckets_found()                                              \
+    if vshard.router.info().bucket.unknown == 0 then                            \
+        return true                                                             \
+    end                                                                         \
+    vshard.router.discovery_wakeup()                                            \
+    return false                                                                \
+end
+
+master_search_helper_f = nil
+function aggressive_master_search_f()                                           \
+    while true do                                                               \
+        vshard.router.static.master_search_fiber:wakeup()                       \
+        fiber.sleep(0.001)                                                      \
+    end                                                                         \
+end
+
+function start_aggressive_master_search()                                       \
+    assert(master_search_helper_f == nil)                                       \
+    master_search_helper_f = fiber.new(aggressive_master_search_f)              \
+    master_search_helper_f:set_joinable(true)                                   \
+end
+
+function stop_aggressive_master_search()                                        \
+    assert(master_search_helper_f ~= nil)                                       \
+    master_search_helper_f:cancel()                                             \
+    master_search_helper_f:join()                                               \
+    master_search_helper_f = nil                                                \
+end
+
+--
+-- Simulate the first cfg when no masters are known.
+--
+forget_masters()
+assert(vshard.router.static.master_search_fiber ~= nil)
+test_run:wait_cond(check_all_masters_found)
+test_run:wait_cond(check_all_buckets_found)
+
+--
+-- Change master and see how router finds it again.
+--
+test_run:switch('storage_1_a')
+replicas = cfg.sharding[util.replicasets[1]].replicas
+replicas[util.name_to_uuid.storage_1_a].master = false
+replicas[util.name_to_uuid.storage_1_b].master = true
+vshard.storage.cfg(cfg, instance_uuid)
+
+test_run:switch('storage_1_b')
+replicas = cfg.sharding[util.replicasets[1]].replicas
+replicas[util.name_to_uuid.storage_1_a].master = false
+replicas[util.name_to_uuid.storage_1_b].master = true
+vshard.storage.cfg(cfg, instance_uuid)
+
+test_run:switch('router_1')
+big_timeout = 1000000
+opts_big_timeout = {timeout = big_timeout}
+test_run:wait_cond(function()                                                   \
+    return check_master_for_replicaset(1, 'storage_1_b')                        \
+end)
+vshard.router.callrw(1501, 'echo', {1}, opts_big_timeout)
+
+test_run:switch('storage_1_b')
+assert(echo_count == 1)
+echo_count = 0
+
+--
+-- Revert the master back.
+--
+test_run:switch('storage_1_a')
+replicas = cfg.sharding[util.replicasets[1]].replicas
+replicas[util.name_to_uuid.storage_1_a].master = true
+replicas[util.name_to_uuid.storage_1_b].master = false
+vshard.storage.cfg(cfg, instance_uuid)
+
+test_run:switch('storage_1_b')
+replicas = cfg.sharding[util.replicasets[1]].replicas
+replicas[util.name_to_uuid.storage_1_a].master = true
+replicas[util.name_to_uuid.storage_1_b].master = false
+vshard.storage.cfg(cfg, instance_uuid)
+
+test_run:switch('router_1')
+test_run:wait_cond(function()                                                   \
+    return check_master_for_replicaset(1, 'storage_1_a')                        \
+end)
+
+--
+-- Call tries to wait for master if has enough time left.
+--
+start_aggressive_master_search()
+
+forget_masters()
+vshard.router.callrw(1501, 'echo', {1}, opts_big_timeout)
+
+forget_masters()
+-- XXX: this should not depend on master so much. RO requests should be able to
+-- go to replicas.
+vshard.router.callro(1501, 'echo', {1}, opts_big_timeout)
+
+forget_masters()
+vshard.router.route(1501):callrw('echo', {1}, opts_big_timeout)
+
+forget_masters()
+-- XXX: the same as above - should not really wait for master. Regardless of it
+-- being auto or not.
+vshard.router.route(1501):callro('echo', {1}, opts_big_timeout)
+
+stop_aggressive_master_search()
+
+test_run:switch('storage_1_a')
+assert(echo_count == 4)
+echo_count = 0
+
+--
+-- Old replicaset objects stop waiting for master when search is disabled.
+--
+
+-- Turn off masters on the first replicaset.
+replicas = cfg.sharding[util.replicasets[1]].replicas
+replicas[util.name_to_uuid.storage_1_a].master = false
+vshard.storage.cfg(cfg, instance_uuid)
+
+test_run:switch('storage_1_b')
+replicas = cfg.sharding[util.replicasets[1]].replicas
+replicas[util.name_to_uuid.storage_1_a].master = false
+vshard.storage.cfg(cfg, instance_uuid)
+
+-- Try to make RW and RO requests but then turn of the auto search.
+test_run:switch('router_1')
+forget_masters()
+f1 = fiber.create(function()                                                    \
+    fiber.self():set_joinable(true)                                             \
+    return vshard.router.callrw(1501, 'echo', {1}, opts_big_timeout)            \
+end)
+-- XXX: should not really wait for master since this is an RO request. It could
+-- use a replica.
+f2 = fiber.create(function()                                                    \
+    fiber.self():set_joinable(true)                                             \
+    return vshard.router.callro(1501, 'echo', {1}, opts_big_timeout)            \
+end)
+fiber.sleep(0.01)
+disable_auto_masters()
+f1:join()
+f2:join()
+
+--
+-- Multiple masters logging.
+--
+test_run:switch('storage_1_a')
+replicas = cfg.sharding[util.replicasets[1]].replicas
+replicas[util.name_to_uuid.storage_1_a].master = true
+vshard.storage.cfg(cfg, instance_uuid)
+
+test_run:switch('storage_1_b')
+replicas = cfg.sharding[util.replicasets[1]].replicas
+replicas[util.name_to_uuid.storage_1_b].master = true
+vshard.storage.cfg(cfg, instance_uuid)
+
+test_run:switch('router_1')
+forget_masters()
+start_aggressive_master_search()
+test_run:wait_log('router_1', 'Found more than one master', nil, 10)
+stop_aggressive_master_search()
+
+--
+-- Async request won't wait for master. Otherwise it would need to wait, which
+-- is not async behaviour. The timeout should be ignored.
+--
+do                                                                              \
+    forget_masters()                                                            \
+    return vshard.router.callrw(1501, 'echo', {1}, {                            \
+        is_async = true, timeout = big_timeout                                  \
+    })                                                                          \
+end
+
+_ = test_run:switch("default")
+_ = test_run:cmd("stop server router_1")
+_ = test_run:cmd("cleanup server router_1")
+test_run:drop_cluster(REPLICASET_1)
+test_run:drop_cluster(REPLICASET_2)
+_ = test_run:cmd('clear filter')
diff --git a/vshard/consts.lua b/vshard/consts.lua
index 47a893b..66a09ae 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -52,6 +52,11 @@ return {
     DISCOVERY_WORK_STEP = 0.01,
     DISCOVERY_TIMEOUT = 10,
 
+    MASTER_SEARCH_IDLE_INTERVAL = 5,
+    MASTER_SEARCH_WORK_INTERVAL = 0.5,
+    MASTER_SEARCH_BACKOFF_INTERVAL = 5,
+    MASTER_SEARCH_TIEMOUT = 5,
+
     TIMEOUT_INFINITY = 500 * 365 * 86400,
     DEADLINE_INFINITY = math.huge,
 }
diff --git a/vshard/error.lua b/vshard/error.lua
index bcbcd71..e2d1a31 100644
--- a/vshard/error.lua
+++ b/vshard/error.lua
@@ -153,6 +153,11 @@ local error_message_template = {
         name = 'BUCKET_RECV_DATA_ERROR',
         msg = 'Can not receive the bucket %s data in space "%s" at tuple %s: %s',
         args = {'bucket_id', 'space', 'tuple', 'reason'},
+    },
+    [31] = {
+        name = 'MULTIPLE_MASTERS_FOUND',
+        msg = 'Found more than one master in replicaset %s on nodes %s and %s',
+        args = {'replicaset_uuid', 'master1', 'master2'},
     }
 }
 
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index fa048c9..7747258 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -25,6 +25,10 @@
 --          }
 --      },
 --      master = <master server from the array above>,
+--      master_cond = <condition variable signaled when the replicaset finds or
+--                     changes its master>,
+--      is_auto_master = <true when is configured to find the master on
+--                        its own>,
 --      replica = <nearest available replica object>,
 --      balance_i = <index of a next replica in priority_list to
 --                   use for a load-balanced request>,
@@ -55,6 +59,8 @@ local luuid = require('uuid')
 local ffi = require('ffi')
 local util = require('vshard.util')
 local fiber_clock = fiber.clock
+local fiber_yield = fiber.yield
+local fiber_cond_wait = util.fiber_cond_wait
 local gsc = util.generate_self_checker
 
 --
@@ -159,6 +165,26 @@ local function replicaset_connect_to_replica(replicaset, replica)
     return conn
 end
 
+local function replicaset_wait_master(replicaset, timeout)
+    local master = replicaset.master
+    -- Fast path - master is almost always known.
+    if master then
+        return master, timeout
+    end
+    -- Slow path.
+    local deadline = fiber_clock() + timeout
+    repeat
+        if not replicaset.is_auto_master or
+           not fiber_cond_wait(replicaset.master_cond, timeout) then
+            return nil, lerror.vshard(lerror.code.MISSING_MASTER,
+                                      replicaset.uuid)
+        end
+        timeout = deadline - fiber_clock()
+        master = replicaset.master
+    until master
+    return master, timeout
+end
+
 --
 -- Create net.box connection to master.
 --
@@ -175,7 +201,13 @@ end
 -- Wait until the master instance is connected.
 --
 local function replicaset_wait_connected(replicaset, timeout)
-    return netbox_wait_connected(replicaset_connect_master(replicaset), timeout)
+    local master
+    master, timeout = replicaset_wait_master(replicaset, timeout)
+    if not master then
+        return nil, timeout
+    end
+    local conn = replicaset_connect_to_replica(replicaset, master)
+    return netbox_wait_connected(conn, timeout)
 end
 
 --
@@ -345,18 +377,30 @@ local function replicaset_master_call(replicaset, func, args, opts)
     assert(opts == nil or type(opts) == 'table')
     assert(type(func) == 'string', 'function name')
     assert(args == nil or type(args) == 'table', 'function arguments')
-    local conn, err = replicaset_connect_master(replicaset)
-    if not conn then
-        return nil, err
-    end
-    if not opts then
-        opts = {timeout = replicaset.master.net_timeout}
-    elseif not opts.timeout then
-        opts = table.copy(opts)
-        opts.timeout = replicaset.master.net_timeout
+    local master = replicaset.master
+    if not master then
+        opts = opts and table.copy(opts) or {}
+        if opts.is_async then
+            return nil, lerror.vshard(lerror.code.MISSING_MASTER,
+                                      replicaset.uuid)
+        end
+        local timeout = opts.timeout or consts.MASTER_SEARCH_TIEMOUT
+        master, timeout = replicaset_wait_master(replicaset, timeout)
+        if not master then
+            return nil, timeout
+        end
+        opts.timeout = timeout
+    else
+        if not opts then
+            opts = {timeout = master.net_timeout}
+        elseif not opts.timeout then
+            opts = table.copy(opts)
+            opts.timeout = master.net_timeout
+        end
     end
+    replicaset_connect_to_replica(replicaset, master)
     local net_status, storage_status, retval, error_object =
-        replica_call(replicaset.master, func, args, opts)
+        replica_call(master, func, args, opts)
     -- Ignore net_status - master does not retry requests.
     return storage_status, retval, error_object
 end
@@ -425,11 +469,6 @@ local function replicaset_template_multicallro(prefer_replica, balance)
                 return r
             end
         end
-        local conn, err = replicaset_connect_master(replicaset)
-        if not conn then
-            return nil, err
-        end
-        return master
     end
 
     return function(replicaset, func, args, opts)
@@ -444,9 +483,13 @@ local function replicaset_template_multicallro(prefer_replica, balance)
         end
         local end_time = fiber_clock() + timeout
         while not net_status and timeout > 0 do
-            replica, err = pick_next_replica(replicaset)
+            replica = pick_next_replica(replicaset)
             if not replica then
-                return nil, err
+                replica, timeout = replicaset_wait_master(replicaset, timeout)
+                if not replica then
+                    return nil, timeout
+                end
+                replicaset_connect_to_replica(replicaset, replica)
             end
             opts.timeout = timeout
             net_status, storage_status, retval, err =
@@ -508,7 +551,128 @@ local function rebind_replicasets(replicasets, old_replicasets)
                 end
             end
         end
+        if old_replicaset then
+            -- Take a hint from the old replicaset who is the master now.
+            if replicaset.is_auto_master then
+                local master = old_replicaset.master
+                if master then
+                    replicaset.master = replicaset.replicas[master.uuid]
+                end
+            end
+            -- Stop waiting for master in the old replicaset. Its running
+            -- requests won't find it anyway. Auto search works only for the
+            -- most actual replicaset objects.
+            if old_replicaset.is_auto_master then
+                old_replicaset.is_auto_master = false
+                old_replicaset.master_cond:broadcast()
+            end
+        end
+    end
+end
+
+--
+-- Check if the master is still master, and find a new master if there is no a
+-- known one.
+--
+local function replicaset_locate_master(replicaset)
+    local is_done = true
+    local is_nop = true
+    if not replicaset.is_auto_master then
+        return is_done, is_nop
+    end
+    local func = 'vshard.storage._call'
+    local args = {'info'}
+    local const_timeout = consts.MASTER_SEARCH_TIEMOUT
+    local sync_opts = {timeout = const_timeout}
+    local ok, res, err, f
+    local master = replicaset.master
+    if master then
+        ok, res, err = replica_call(master, func, args, sync_opts)
+        if not ok then
+            return is_done, is_nop, err
+        end
+        if res.is_master then
+            return is_done, is_nop
+        end
+        log.info('Master of replicaset %s, node %s, has resigned. Trying '..
+                 'to find a new one', replicaset.uuid, master.uuid)
+        replicaset.master = nil
+    end
+    is_nop = false
+
+    local last_err
+    local futures = {}
+    local timeout = const_timeout
+    local deadline = fiber_clock() + timeout
+    local async_opts = {is_async = true}
+    local replicaset_uuid = replicaset.uuid
+    for replica_uuid, replica in pairs(replicaset.replicas) do
+        local conn = replica.conn
+        timeout, err = netbox_wait_connected(conn, timeout)
+        if not timeout then
+            last_err = err
+            timeout = deadline - fiber_clock()
+        else
+            ok, f = pcall(conn.call, conn, func, args, async_opts)
+            if not ok then
+                last_err = lerror.make(f)
+            else
+                futures[replica_uuid] = f
+            end
+        end
+    end
+    local master_uuid
+    for replica_uuid, f in pairs(futures) do
+        if timeout < 0 then
+            -- Netbox uses cond var inside, whose wait throws an error when gets
+            -- a negative timeout. Avoid that.
+            timeout = 0
+        end
+        res, err = f:wait_result(timeout)
+        timeout = deadline - fiber_clock()
+        if not res then
+            f:discard()
+            last_err = err
+            goto next_wait
+        end
+        res = res[1]
+        if not res.is_master then
+            goto next_wait
+        end
+        if not master_uuid then
+            master_uuid = replica_uuid
+            goto next_wait
+        end
+        is_done = false
+        last_err = lerror.vshard(lerror.code.MULTIPLE_MASTERS_FOUND,
+                                 replicaset_uuid, master_uuid, replica_uuid)
+        do return is_done, is_nop, last_err end
+        ::next_wait::
+    end
+    master = replicaset.replicas[master_uuid]
+    if master then
+        log.info('Found master for replicaset %s: %s', replicaset_uuid,
+                 master_uuid)
+        replicaset.master = master
+        replicaset.master_cond:broadcast()
+    else
+        is_done = false
+    end
+    return is_done, is_nop, last_err
+end
+
+local function locate_masters(replicasets)
+    local is_all_done = true
+    local is_all_nop = true
+    local last_err
+    for _, replicaset in pairs(replicasets) do
+        local is_done, is_nop, err = replicaset_locate_master(replicaset)
+        is_all_done = is_all_done and is_done
+        is_all_nop = is_all_nop and is_nop
+        last_err = err or last_err
+        fiber_yield()
     end
+    return is_all_done, is_all_nop, last_err
 end
 
 --
@@ -729,6 +893,8 @@ local function buildall(sharding_cfg)
             bucket_count = 0,
             lock = replicaset.lock,
             balance_i = 1,
+            is_auto_master = replicaset.master == 'auto',
+            master_cond = fiber.cond(),
         }, replicaset_mt)
         local priority_list = {}
         for replica_uuid, replica in pairs(replicaset.replicas) do
@@ -802,4 +968,5 @@ return {
     wait_masters_connect = wait_masters_connect,
     rebind_replicasets = rebind_replicasets,
     outdate_replicasets = outdate_replicasets,
+    locate_masters = locate_masters,
 }
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 5e2a96b..9407ccd 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -68,6 +68,8 @@ local ROUTER_TEMPLATE = {
         replicasets = nil,
         -- Fiber to maintain replica connections.
         failover_fiber = nil,
+        -- Fiber to watch for master changes and find new masters.
+        master_search_fiber = nil,
         -- Fiber to discovery buckets in background.
         discovery_fiber = nil,
         -- How discovery works. On - work infinitely. Off - no
@@ -1030,6 +1032,93 @@ local function failover_f(router)
     end
 end
 
+--------------------------------------------------------------------------------
+-- Master search
+--------------------------------------------------------------------------------
+
+local function master_search_step(router)
+    local ok, is_done, is_nop, err = pcall(lreplicaset.locate_masters,
+                                           router.replicasets)
+    if not ok then
+        err = is_done
+        is_done = false
+        is_nop = false
+    end
+    return is_done, is_nop, err
+end
+
+--
+-- Master discovery background function. It is supposed to notice master changes
+-- and find new masters in the replicasets, which are configured for that.
+--
+-- XXX: due to polling the search might notice master change not right when it
+-- happens. In future it makes sense to rewrite master search using
+-- subscriptions. The problem is that at the moment of writing the subscriptions
+-- are not working well in all Tarantool versions.
+--
+local function master_search_f(router)
+    local module_version = M.module_version
+    local is_in_progress = false
+    while module_version == M.module_version do
+        local timeout
+        local start_time = fiber_clock()
+        local is_done, is_nop, err = master_search_step(router)
+        if err then
+            log.error('Error during master search: %s', lerror.make(err))
+        end
+        if is_done then
+            timeout = consts.MASTER_SEARCH_IDLE_INTERVAL
+        elseif err then
+            timeout = consts.MASTER_SEARCH_BACKOFF_INTERVAL
+        else
+            timeout = consts.MASTER_SEARCH_WORK_INTERVAL
+        end
+        if not is_in_progress then
+            if not is_nop and is_done then
+                log.info('Master search happened')
+            elseif not is_done then
+                log.info('Master search is started')
+                is_in_progress = true
+            end
+        elseif is_done then
+            log.info('Master search is finished')
+            is_in_progress = false
+        end
+        local end_time = fiber_clock()
+        local duration = end_time - start_time
+        if not is_nop then
+            log.verbose('Master search step took %s seconds. Next in %s '..
+                        'seconds', duration, timeout)
+        end
+        lfiber.sleep(timeout)
+    end
+end
+
+local function master_search_set(router)
+    local enable = false
+    for _, rs in pairs(router.replicasets) do
+        if rs.is_auto_master then
+            enable = true
+            break
+        end
+    end
+    local search_fiber = router.master_search_fiber
+    if enable and search_fiber == nil then
+        log.info('Master auto search is enabled')
+        router.master_search_fiber = util.reloadable_fiber_create(
+            'vshard.master_search.' .. router.name, M, 'master_search_f',
+            router)
+    elseif not enable and search_fiber ~= nil then
+        -- Do not make users pay for what they do not use - when the search is
+        -- disabled for all replicasets, there should not be any fiber.
+        log.info('Master auto search is disabled')
+        if search_fiber:status() ~= 'dead' then
+            search_fiber:cancel()
+        end
+        router.master_search_fiber = nil
+    end
+end
+
 --------------------------------------------------------------------------------
 -- Configuration
 --------------------------------------------------------------------------------
@@ -1100,6 +1189,7 @@ local function router_cfg(router, cfg, is_reload)
             'vshard.failover.' .. router.name, M, 'failover_f', router)
     end
     discovery_set(router, vshard_cfg.discovery_mode)
+    master_search_set(router)
 end
 
 --------------------------------------------------------------------------------
@@ -1535,6 +1625,10 @@ end
 --------------------------------------------------------------------------------
 -- Module definition
 --------------------------------------------------------------------------------
+M.discovery_f = discovery_f
+M.failover_f = failover_f
+M.master_search_f = master_search_f
+M.router_mt = router_mt
 --
 -- About functions, saved in M, and reloading see comment in
 -- storage/init.lua.
@@ -1556,10 +1650,6 @@ else
     M.module_version = M.module_version + 1
 end
 
-M.discovery_f = discovery_f
-M.failover_f = failover_f
-M.router_mt = router_mt
-
 module.cfg = legacy_cfg
 module.new = router_new
 module.internal = M
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list