[Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw()
Oleg Babin
olegrok at tarantool.org
Wed Feb 24 13:28:17 MSK 2021
Thanks a lot for your patch! See 5 comments below.
On 23.02.2021 03:15, Vladislav Shpilevoy wrote:
> Closes #147
Will read-only map-reduce functions be done in the scope of separate
issue/patch?
I know about #173 but seems we need to keep information about map_callro
function.
> @TarantoolBot document
> Title: vshard.router.map_callrw()
>
> `vshard.router.map_callrw()` implements consistent map-reduce over
> the entire cluster. Consistency means all the data was accessible,
> and didn't move during map requests execution.
>
> It is useful when need to access potentially all the data in the
> cluster or simply huge number of buckets scattered over the
> instances and whose individual `vshard.router.call()` would take
> too long.
>
> `Map_callrw()` takes name of the function to call on the storages,
> arguments in the format of array, and not required options map.
> The only supported option for now is timeout which is applied to
> the entire call. Not to individual calls for each storage.
> ```
> vshard.router.map_callrw(func_name, args[, {timeout = <seconds>}])
> ```
>
> The chosen function is called on the master node of each
> replicaset with the given arguments.
>
> In case of success `vshard.router.map_callrw()` returns a map with
> replicaset UUIDs as keys and results of the user's function as
> values, like this:
> ```
> {uuid1 = {res1}, uuid2 = {res2}, ...}
> ```
> If the function returned `nil` or `box.NULL` from one of the
> storages, it won't be present in the result map.
>
> In case of fail it returns nil, error object, and optional
> replicaset UUID where the error happened. UUID may not be returned
> if the error wasn't about a concrete replicaset.
>
> For instance, the method fails if not all buckets were found even
> if all replicasets were scanned successfully.
>
> Handling the result looks like this:
> ```Lua
> res, err, uuid = vshard.router.map_callrw(...)
> if not res then
> -- Error.
> -- 'err' - error object. 'uuid' - optional UUID of replicaset
> -- where the error happened.
> ...
> else
> -- Success.
> for uuid, value in pairs(res) do
> ...
> end
> end
> ```
>
> Map-Reduce in vshard works in 3 stages: Ref, Map, Reduce. Ref is
> an internal stage which is supposed to ensure data consistency
> during user's function execution on all nodes.
>
> Reduce is not performed by vshard. It is what user's code does
> with results of `map_callrw()`.
>
> Consistency, as it is defined for map-reduce, is not compatible
> with rebalancing. Because any bucket move would make the sender
> and receiver nodes 'inconsistent' - it is not possible to call a
> function on them which could simply access all the data without
> doing `vshard.storage.bucket_ref()`.
>
> This makes Ref stage very intricate as it must work together with
> rebalancer to ensure neither of them block each other.
>
> For this storage has a scheduler specifically for bucket moves and
> storage refs which shares storage time between them fairly.
>
> Definition of fairness depends on how long and frequent the moves
> and refs are. This can be configured using storage options
> `sched_move_quota` and `sched_ref_quota`. See more details about
> them in the corresponding doc section.
>
> The scheduler configuration may affect map-reduce requests if they
> are used a lot during rebalancing.
>
> Keep in mind that it is not a good idea to use too big timeouts
> for `map_callrw()`. Because the router will try to block the
> bucket moves for the given timeout on all storages. And in case
> something will go wrong, the block will remain for the entire
> timeout. This means, in particular, having the timeout longer
> than, say, minutes is a super bad way to go unless it is for
> tests only.
>
> Also it is important to remember that `map_callrw()` does not
> work on replicas. It works only on masters. This makes it unusable
> if at least one replicaset has its master node down.
> ---
> test/router/map-reduce.result | 636 ++++++++++++++++++++++++++++++++
> test/router/map-reduce.test.lua | 258 +++++++++++++
> test/router/router.result | 9 +-
> test/upgrade/upgrade.result | 5 +-
> vshard/replicaset.lua | 34 ++
> vshard/router/init.lua | 180 +++++++++
> vshard/storage/init.lua | 47 +++
> 7 files changed, 1164 insertions(+), 5 deletions(-)
> create mode 100644 test/router/map-reduce.result
> create mode 100644 test/router/map-reduce.test.lua
>
> diff --git a/test/router/map-reduce.result b/test/router/map-reduce.result
> new file mode 100644
> index 0000000..1e8995a
> --- /dev/null
> +++ b/test/router/map-reduce.result
> @@ -0,0 +1,636 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
> + | ---
> + | ...
> +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
> + | ---
> + | ...
> +test_run:create_cluster(REPLICASET_1, 'router')
> + | ---
> + | ...
> +test_run:create_cluster(REPLICASET_2, 'router')
> + | ---
> + | ...
> +util = require('util')
> + | ---
> + | ...
> +util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
> + | ---
> + | ...
> +util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
> + | ---
> + | ...
> +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
> + | ---
> + | ...
> +util.push_rs_filters(test_run)
> + | ---
> + | ...
> +_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
> + | ---
> + | ...
> +_ = test_run:cmd("start server router_1")
> + | ---
> + | ...
> +
> +_ = test_run:switch("router_1")
> + | ---
> + | ...
> +util = require('util')
> + | ---
> + | ...
> +
> +--
> +-- gh-147: consistent map-reduce.
> +--
> +big_timeout = 1000000
> + | ---
> + | ...
> +big_timeout_opts = {timeout = big_timeout}
> + | ---
> + | ...
> +vshard.router.cfg(cfg)
> + | ---
> + | ...
> +vshard.router.bootstrap(big_timeout_opts)
> + | ---
> + | - true
> + | ...
> +-- Trivial basic sanity test. Multireturn is not supported, should be truncated.
> +vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts)
> + | ---
> + | - <replicaset_2>:
> + | - 1
> + | <replicaset_1>:
> + | - 1
> + | ...
> +
> +--
> +-- Fail during connecting to storages. For the succeeded storages the router
> +-- tries to send unref.
> +--
> +timeout = 0.001
> + | ---
> + | ...
> +timeout_opts = {timeout = timeout}
> + | ---
> + | ...
> +
> +test_run:cmd('stop server storage_1_a')
> + | ---
> + | - true
> + | ...
> +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - Timeout exceeded
> + | ...
> +-- Even if ref was sent successfully to storage_2_a, it was deleted before
> +-- router returned an error.
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +lref = require('vshard.storage.ref')
> + | ---
> + | ...
> +-- Wait because unref is sent asynchronously. Could arrive not immediately.
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +test_run:cmd('start server storage_1_a')
> + | ---
> + | - true
> + | ...
> +-- Works again - router waited for connection being established.
> +vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> + | ---
> + | - <replicaset_2>:
> + | - 1
> + | <replicaset_1>:
> + | - 1
> + | ...
> +
> +--
> +-- Do all the same but with another storage being stopped. The same test is done
> +-- again because can't tell at which of the tests to where the router will go
> +-- first.
> +--
> +test_run:cmd('stop server storage_2_a')
> + | ---
> + | - true
> + | ...
> +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - Timeout exceeded
> + | ...
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +lref = require('vshard.storage.ref')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +test_run:cmd('start server storage_2_a')
> + | ---
> + | - true
> + | ...
> +vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> + | ---
> + | - <replicaset_2>:
> + | - 1
> + | <replicaset_1>:
> + | - 1
> + | ...
> +
> +--
> +-- Fail at ref stage handling. Unrefs are sent to cancel those refs which
> +-- succeeded. To simulate a ref fail make the router think there is a moving
> +-- bucket.
> +--
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +lsched = require('vshard.storage.sched')
> + | ---
> + | ...
> +big_timeout = 1000000
> + | ---
> + | ...
> +lsched.move_start(big_timeout)
> + | ---
> + | - 1000000
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - Timeout exceeded
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +lsched = require('vshard.storage.sched')
> + | ---
> + | ...
> +lref = require('vshard.storage.ref')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Do all the same with another storage being busy with a 'move'.
> +--
> +big_timeout = 1000000
> + | ---
> + | ...
> +lsched.move_start(big_timeout)
> + | ---
> + | - 1000000
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +lref = require('vshard.storage.ref')
> + | ---
> + | ...
> +lsched.move_end(1)
> + | ---
> + | ...
> +assert(lref.count == 0)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - Timeout exceeded
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +lsched.move_end(1)
> + | ---
> + | ...
> +assert(lref.count == 0)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> + | ---
> + | - <replicaset_2>:
> + | - 1
> + | <replicaset_1>:
> + | - 1
> + | ...
> +
> +--
> +-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs
> +-- correctly. To simulate ref fail add a duplicate manually.
> +--
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +box.schema.user.grant('storage', 'super')
> + | ---
> + | ...
> +router_sid = nil
> + | ---
> + | ...
> +function save_router_sid() \
> + router_sid = box.session.id() \
> +end
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +box.schema.user.grant('storage', 'super')
> + | ---
> + | ...
> +router_sid = nil
> + | ---
> + | ...
> +function save_router_sid() \
> + router_sid = box.session.id() \
> +end
> + | ---
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts)
> + | ---
> + | - []
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +lref.add(1, router_sid, big_timeout)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +vshard.router.internal.ref_id = 1
> + | ---
> + | ...
> +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - 'Can not add a storage ref: duplicate ref'
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +_ = lref.del(1, router_sid)
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +lref.add(1, router_sid, big_timeout)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +vshard.router.internal.ref_id = 1
> + | ---
> + | ...
> +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - 'Can not add a storage ref: duplicate ref'
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +_ = lref.del(1, router_sid)
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Fail if some buckets are not visible. Even if all the known replicasets were
> +-- scanned. It means consistency violation.
> +--
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +bucket_id = box.space._bucket.index.pk:min().id
> + | ---
> + | ...
> +vshard.storage.bucket_force_drop(bucket_id)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - 1 buckets are not discovered
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +vshard.storage.bucket_force_create(bucket_id)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +bucket_id = box.space._bucket.index.pk:min().id
> + | ---
> + | ...
> +vshard.storage.bucket_force_drop(bucket_id)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - 1 buckets are not discovered
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +vshard.storage.bucket_force_create(bucket_id)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Storage map unit tests.
> +--
> +
> +-- Map fails not being able to use the ref.
> +ok, err = vshard.storage._call('storage_map', 0, 'echo', {1})
> + | ---
> + | ...
> +ok, err.message
> + | ---
> + | - null
> + | - 'Can not use a storage ref: no session'
> + | ...
> +
> +-- Map fails and clears the ref when the user function fails.
> +vshard.storage._call('storage_ref', 0, big_timeout)
> + | ---
> + | - 1500
> + | ...
> +assert(lref.count == 1)
> + | ---
> + | - true
> + | ...
> +ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {})
> + | ---
> + | ...
> +assert(lref.count == 0)
> + | ---
> + | - true
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - Unknown error
> + | ...
> +
> +-- Map fails gracefully when couldn't delete the ref.
> +vshard.storage._call('storage_ref', 0, big_timeout)
> + | ---
> + | - 1500
> + | ...
> +ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call', \
> + {'storage_unref', 0})
> + | ---
> + | ...
> +assert(lref.count == 0)
> + | ---
> + | - true
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - 'Can not delete a storage ref: no ref'
> + | ...
> +
> +--
> +-- Map fail is handled and the router tries to send unrefs.
> +--
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +need_throw = true
> + | ---
> + | ...
> +function map_throw() \
> + if need_throw then \
> + raise_client_error() \
> + end \
> + return '+' \
> +end
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +need_throw = false
> + | ---
> + | ...
> +function map_throw() \
> + if need_throw then \
> + raise_client_error() \
> + end \
> + return '+' \
> +end
> + | ---
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts)
> + | ---
> + | ...
> +ok, err.message
> + | ---
> + | - null
> + | - Unknown error
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +need_throw = false
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +need_throw = true
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('router_1')
> + | ---
> + | ...
> +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts)
> + | ---
> + | ...
> +ok, err.message
> + | ---
> + | - null
> + | - Unknown error
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('default')
> + | ---
> + | ...
> +_ = test_run:cmd("stop server router_1")
> + | ---
> + | ...
> +_ = test_run:cmd("cleanup server router_1")
> + | ---
> + | ...
> +test_run:drop_cluster(REPLICASET_1)
> + | ---
> + | ...
> +test_run:drop_cluster(REPLICASET_2)
> + | ---
> + | ...
> +_ = test_run:cmd('clear filter')
> + | ---
> + | ...
> diff --git a/test/router/map-reduce.test.lua b/test/router/map-reduce.test.lua
> new file mode 100644
> index 0000000..3b63248
> --- /dev/null
> +++ b/test/router/map-reduce.test.lua
> @@ -0,0 +1,258 @@
> +test_run = require('test_run').new()
> +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
> +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
> +test_run:create_cluster(REPLICASET_1, 'router')
> +test_run:create_cluster(REPLICASET_2, 'router')
> +util = require('util')
> +util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
> +util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
> +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
> +util.push_rs_filters(test_run)
> +_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
> +_ = test_run:cmd("start server router_1")
> +
> +_ = test_run:switch("router_1")
> +util = require('util')
> +
> +--
> +-- gh-147: consistent map-reduce.
> +--
> +big_timeout = 1000000
> +big_timeout_opts = {timeout = big_timeout}
> +vshard.router.cfg(cfg)
> +vshard.router.bootstrap(big_timeout_opts)
> +-- Trivial basic sanity test. Multireturn is not supported, should be truncated.
> +vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts)
> +
> +--
> +-- Fail during connecting to storages. For the succeeded storages the router
> +-- tries to send unref.
> +--
> +timeout = 0.001
> +timeout_opts = {timeout = timeout}
> +
> +test_run:cmd('stop server storage_1_a')
> +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
> +assert(not ok and err.message)
> +-- Even if ref was sent successfully to storage_2_a, it was deleted before
> +-- router returned an error.
> +_ = test_run:switch('storage_2_a')
> +lref = require('vshard.storage.ref')
> +-- Wait because unref is sent asynchronously. Could arrive not immediately.
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +_ = test_run:switch('router_1')
> +test_run:cmd('start server storage_1_a')
> +-- Works again - router waited for connection being established.
> +vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> +
> +--
> +-- Do all the same but with another storage being stopped. The same test is done
> +-- again because can't tell at which of the tests to where the router will go
> +-- first.
> +--
> +test_run:cmd('stop server storage_2_a')
> +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
> +assert(not ok and err.message)
> +_ = test_run:switch('storage_1_a')
> +lref = require('vshard.storage.ref')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +_ = test_run:switch('router_1')
> +test_run:cmd('start server storage_2_a')
> +vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> +
> +--
> +-- Fail at ref stage handling. Unrefs are sent to cancel those refs which
> +-- succeeded. To simulate a ref fail make the router think there is a moving
> +-- bucket.
> +--
> +_ = test_run:switch('storage_1_a')
> +lsched = require('vshard.storage.sched')
> +big_timeout = 1000000
> +lsched.move_start(big_timeout)
> +
> +_ = test_run:switch('router_1')
> +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
> +assert(not ok and err.message)
> +
> +_ = test_run:switch('storage_2_a')
> +lsched = require('vshard.storage.sched')
> +lref = require('vshard.storage.ref')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +--
> +-- Do all the same with another storage being busy with a 'move'.
> +--
> +big_timeout = 1000000
> +lsched.move_start(big_timeout)
> +
> +_ = test_run:switch('storage_1_a')
> +lref = require('vshard.storage.ref')
> +lsched.move_end(1)
> +assert(lref.count == 0)
> +
> +_ = test_run:switch('router_1')
> +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
> +assert(not ok and err.message)
> +
> +_ = test_run:switch('storage_1_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +_ = test_run:switch('storage_2_a')
> +lsched.move_end(1)
> +assert(lref.count == 0)
> +
> +_ = test_run:switch('router_1')
> +vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> +
> +--
> +-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs
> +-- correctly. To simulate ref fail add a duplicate manually.
> +--
> +_ = test_run:switch('storage_1_a')
> +box.schema.user.grant('storage', 'super')
> +router_sid = nil
> +function save_router_sid() \
> + router_sid = box.session.id() \
> +end
> +
> +_ = test_run:switch('storage_2_a')
> +box.schema.user.grant('storage', 'super')
> +router_sid = nil
> +function save_router_sid() \
> + router_sid = box.session.id() \
> +end
> +
> +_ = test_run:switch('router_1')
> +vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts)
> +
> +_ = test_run:switch('storage_1_a')
> +lref.add(1, router_sid, big_timeout)
> +
> +_ = test_run:switch('router_1')
> +vshard.router.internal.ref_id = 1
> +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> +assert(not ok and err.message)
> +
> +_ = test_run:switch('storage_1_a')
> +_ = lref.del(1, router_sid)
> +
> +_ = test_run:switch('storage_2_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +lref.add(1, router_sid, big_timeout)
> +
> +_ = test_run:switch('router_1')
> +vshard.router.internal.ref_id = 1
> +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> +assert(not ok and err.message)
> +
> +_ = test_run:switch('storage_2_a')
> +_ = lref.del(1, router_sid)
> +
> +_ = test_run:switch('storage_1_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +--
> +-- Fail if some buckets are not visible. Even if all the known replicasets were
> +-- scanned. It means consistency violation.
> +--
> +_ = test_run:switch('storage_1_a')
> +bucket_id = box.space._bucket.index.pk:min().id
> +vshard.storage.bucket_force_drop(bucket_id)
> +
> +_ = test_run:switch('router_1')
> +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> +assert(not ok and err.message)
> +
> +_ = test_run:switch('storage_1_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +vshard.storage.bucket_force_create(bucket_id)
> +
> +_ = test_run:switch('storage_2_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +bucket_id = box.space._bucket.index.pk:min().id
> +vshard.storage.bucket_force_drop(bucket_id)
> +
> +_ = test_run:switch('router_1')
> +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
> +assert(not ok and err.message)
> +
> +_ = test_run:switch('storage_2_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +vshard.storage.bucket_force_create(bucket_id)
> +
> +_ = test_run:switch('storage_1_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +--
> +-- Storage map unit tests.
> +--
> +
> +-- Map fails not being able to use the ref.
> +ok, err = vshard.storage._call('storage_map', 0, 'echo', {1})
> +ok, err.message
> +
> +-- Map fails and clears the ref when the user function fails.
> +vshard.storage._call('storage_ref', 0, big_timeout)
> +assert(lref.count == 1)
> +ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {})
> +assert(lref.count == 0)
> +assert(not ok and err.message)
> +
> +-- Map fails gracefully when couldn't delete the ref.
> +vshard.storage._call('storage_ref', 0, big_timeout)
> +ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call', \
> + {'storage_unref', 0})
> +assert(lref.count == 0)
> +assert(not ok and err.message)
> +
> +--
> +-- Map fail is handled and the router tries to send unrefs.
> +--
> +_ = test_run:switch('storage_1_a')
> +need_throw = true
> +function map_throw() \
> + if need_throw then \
> + raise_client_error() \
> + end \
> + return '+' \
> +end
> +
> +_ = test_run:switch('storage_2_a')
> +need_throw = false
> +function map_throw() \
> + if need_throw then \
> + raise_client_error() \
> + end \
> + return '+' \
> +end
> +
> +_ = test_run:switch('router_1')
> +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts)
> +ok, err.message
> +
> +_ = test_run:switch('storage_1_a')
> +need_throw = false
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +_ = test_run:switch('storage_2_a')
> +need_throw = true
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +_ = test_run:switch('router_1')
> +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts)
> +ok, err.message
> +
> +_ = test_run:switch('storage_1_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +_ = test_run:switch('storage_2_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +_ = test_run:switch('default')
> +_ = test_run:cmd("stop server router_1")
> +_ = test_run:cmd("cleanup server router_1")
> +test_run:drop_cluster(REPLICASET_1)
> +test_run:drop_cluster(REPLICASET_2)
> +_ = test_run:cmd('clear filter')
> \ No newline at end of file
> diff --git a/test/router/router.result b/test/router/router.result
> index 3c1d073..f9ee37c 100644
> --- a/test/router/router.result
> +++ b/test/router/router.result
> @@ -1163,14 +1163,15 @@ error_messages
> - - Use replicaset:callro(...) instead of replicaset.callro(...)
> - Use replicaset:connect_master(...) instead of replicaset.connect_master(...)
> - Use replicaset:callre(...) instead of replicaset.callre(...)
> - - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...)
> - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...)
> - - Use replicaset:callrw(...) instead of replicaset.callrw(...)
> + - Use replicaset:connect(...) instead of replicaset.connect(...)
> + - Use replicaset:wait_connected(...) instead of replicaset.wait_connected(...)
> + - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
> - Use replicaset:callbro(...) instead of replicaset.callbro(...)
> - Use replicaset:connect_all(...) instead of replicaset.connect_all(...)
> + - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...)
> - Use replicaset:call(...) instead of replicaset.call(...)
> - - Use replicaset:connect(...) instead of replicaset.connect(...)
> - - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
> + - Use replicaset:callrw(...) instead of replicaset.callrw(...)
> - Use replicaset:callbre(...) instead of replicaset.callbre(...)
> ...
> _, replica = next(replicaset.replicas)
> diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result
> index c2d54a3..833da3f 100644
> --- a/test/upgrade/upgrade.result
> +++ b/test/upgrade/upgrade.result
> @@ -162,9 +162,12 @@ vshard.storage._call ~= nil
> vshard.storage._call('test_api', 1, 2, 3)
> | ---
> | - bucket_recv: true
> + | storage_ref: true
> | rebalancer_apply_routes: true
> - | test_api: true
> + | storage_map: true
> | rebalancer_request_state: true
> + | test_api: true
> + | storage_unref: true
> | - 1
> | - 2
> | - 3
> diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
> index 7437e3b..56ea165 100644
> --- a/vshard/replicaset.lua
> +++ b/vshard/replicaset.lua
> @@ -139,6 +139,39 @@ local function replicaset_connect_master(replicaset)
> return replicaset_connect_to_replica(replicaset, master)
> end
>
> +--
> +-- Wait until the master instance is connected. This is necessary at least for
> +-- async requests because they fail immediately if the connection is not
> +-- established.
> +-- Returns the remaining timeout because is expected to be used to connect to
> +-- many replicasets in a loop, where such return saves one clock get in the
> +-- caller code and is just cleaner code.
> +--
> +local function replicaset_wait_connected(replicaset, timeout)
> + local deadline = fiber_clock() + timeout
> + local ok, res
> + while true do
> + local conn = replicaset_connect_master(replicaset)
> + if conn.state == 'active' then
> + return timeout
> + end
Why don't you use conn:is_connected(). It considers "fetch_schema" as
appropriate state.
> + -- Netbox uses fiber_cond inside, which throws an irrelevant usage error
> + -- at negative timeout. Need to check the case manually.
> + if timeout < 0 then
> + return nil, lerror.timeout()
> + end
> + ok, res = pcall(conn.wait_connected, conn, timeout)
> + if not ok then
> + return nil, lerror.make(res)
> + end
> + if not res then
> + return nil, lerror.timeout()
> + end
> + timeout = deadline - fiber_clock()
> + end
> + assert(false)
> +end
> +
> --
> -- Create net.box connections to all replicas and master.
> --
> @@ -483,6 +516,7 @@ local replicaset_mt = {
> connect_replica = replicaset_connect_to_replica;
> down_replica_priority = replicaset_down_replica_priority;
> up_replica_priority = replicaset_up_replica_priority;
> + wait_connected = replicaset_wait_connected,
> call = replicaset_master_call;
> callrw = replicaset_master_call;
> callro = replicaset_template_multicallro(false, false);
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 97bcb0a..8abd77f 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -44,6 +44,11 @@ if not M then
> module_version = 0,
> -- Number of router which require collecting lua garbage.
> collect_lua_garbage_cnt = 0,
> +
> + ----------------------- Map-Reduce -----------------------
> + -- Storage Ref ID. It must be unique for each ref request
> + -- and therefore is global and monotonically growing.
> + ref_id = 0,
Maybe 0ULL?
> }
> end
>
> @@ -674,6 +679,177 @@ local function router_call(router, bucket_id, opts, ...)
> ...)
> end
>
> +local router_map_callrw
> +
> +if util.version_is_at_least(1, 10, 0) then
> +--
> +-- Consistent Map-Reduce. The given function is called on all masters in the
> +-- cluster with a guarantee that in case of success it was executed with all
> +-- buckets being accessible for reads and writes.
> +--
> +-- Consistency in scope of map-reduce means all the data was accessible, and
> +-- didn't move during map requests execution. To preserve the consistency there
> +-- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce.
> +--
> +-- Refs are broadcast before Map stage to pin the buckets to their storages, and
> +-- ensure they won't move until maps are done.
> +--
> +-- Map requests are broadcast in case all refs are done successfully. They
> +-- execute the user function + delete the refs to enable rebalancing again.
> +--
> +-- On the storages there are additional means to ensure map-reduces don't block
> +-- rebalancing forever and vice versa.
> +--
> +-- The function is not as slow as it may seem - it uses netbox's feature
> +-- is_async to send refs and maps in parallel. So cost of the function is about
> +-- 2 network exchanges to the most far storage in terms of time.
> +--
> +-- @param router Router instance to use.
> +-- @param func Name of the function to call.
> +-- @param args Function arguments passed in netbox style (as an array).
> +-- @param opts Can only contain 'timeout' as a number of seconds. Note that the
> +-- refs may end up being kept on the storages during this entire timeout if
> +-- something goes wrong. For instance, network issues appear. This means
> +-- better not use a value bigger than necessary. A stuck infinite ref can
> +-- only be dropped by this router restart/reconnect or the storage restart.
> +--
> +-- @return In case of success - a map with replicaset UUID keys and values being
> +-- what the function returned from the replicaset.
> +--
> +-- @return In case of an error - nil, error object, optional UUID of the
> +-- replicaset where the error happened. UUID may be not present if it wasn't
> +-- about concrete replicaset. For example, not all buckets were found even
> +-- though all replicasets were scanned.
> +--
> +router_map_callrw = function(router, func, args, opts)
> + local replicasets = router.replicasets
It would be great to filter here replicasets with bucket_count = 0 and
weight = 0.
In case if such "dummy" replicasets are disabled we get an error
"connection refused".
> + local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN
> + local deadline = fiber_clock() + timeout
> + local err, err_uuid, res, ok, map
> + local futures = {}
> + local bucket_count = 0
> + local opts_async = {is_async = true}
> + local rs_count = 0
> + local rid = M.ref_id
> + M.ref_id = rid + 1
> + -- Nil checks are done explicitly here (== nil instead of 'not'), because
> + -- netbox requests return box.NULL instead of nils.
> +
> + --
> + -- Ref stage: send.
> + --
> + for uuid, rs in pairs(replicasets) do
> + -- Netbox async requests work only with active connections. Need to wait
> + -- for the connection explicitly.
> + timeout, err = rs:wait_connected(timeout)
> + if timeout == nil then
> + err_uuid = uuid
> + goto fail
> + end
> + res, err = rs:callrw('vshard.storage._call',
> + {'storage_ref', rid, timeout}, opts_async)
> + if res == nil then
> + err_uuid = uuid
> + goto fail
> + end
> + futures[uuid] = res
> + rs_count = rs_count + 1
> + end
> + map = table_new(0, rs_count)
> + --
> + -- Ref stage: collect.
> + --
> + for uuid, future in pairs(futures) do
> + res, err = future:wait_result(timeout)
> + -- Handle netbox error first.
> + if res == nil then
> + err_uuid = uuid
> + goto fail
> + end
> + -- Ref returns nil,err or bucket count.
> + res, err = unpack(res)
Seems `res, err = res[1], res[2]` could be a bit faster.
> + if res == nil then
> + err_uuid = uuid
> + goto fail
> + end
> + bucket_count = bucket_count + res
> + timeout = deadline - fiber_clock()
> + end
> + -- All refs are done but not all buckets are covered. This is odd and can
> + -- mean many things. The most possible ones: 1) outdated configuration on
> + -- the router and it does not see another replicaset with more buckets,
> + -- 2) some buckets are simply lost or duplicated - could happen as a bug, or
> + -- if the user does a maintenance of some kind by creating/deleting buckets.
> + -- In both cases can't guarantee all the data would be covered by Map calls.
> + if bucket_count ~= router.total_bucket_count then
> + err = lerror.vshard(lerror.code.UNKNOWN_BUCKETS,
> + router.total_bucket_count - bucket_count)
> + goto fail
> + end
> + --
> + -- Map stage: send.
> + --
> + args = {'storage_map', rid, func, args}
> + for uuid, rs in pairs(replicasets) do
> + res, err = rs:callrw('vshard.storage._call', args, opts_async)
> + if res == nil then
> + err_uuid = uuid
> + goto fail
> + end
> + futures[uuid] = res
> + end
> + --
> + -- Ref stage: collect.
> + --
> + for uuid, f in pairs(futures) do
> + res, err = f:wait_result(timeout)
> + if res == nil then
> + err_uuid = uuid
> + goto fail
> + end
> + -- Map returns true,res or nil,err.
> + ok, res = unpack(res)
> + if ok == nil then
> + err = res
> + err_uuid = uuid
> + goto fail
> + end
> + if res ~= nil then
> + -- Store as a table so in future it could be extended for
> + -- multireturn.
> + map[uuid] = {res}
> + end
> + timeout = deadline - fiber_clock()
> + end
> + do return map end
> +
> +::fail::
> + for uuid, f in pairs(futures) do
> + f:discard()
> + -- Best effort to remove the created refs before exiting. Can help if
> + -- the timeout was big and the error happened early.
> + f = replicasets[uuid]:callrw('vshard.storage._call',
> + {'storage_unref', rid}, opts_async)
> + if f ~= nil then
> + -- Don't care waiting for a result - no time for this. But it won't
> + -- affect the request sending if the connection is still alive.
> + f:discard()
> + end
> + end
> + err = lerror.make(err)
> + return nil, err, err_uuid
> +end
> +
> +-- Version >= 1.10.
> +else
> +-- Version < 1.10.
> +
> +router_map_callrw = function()
> + error('Supported for Tarantool >= 1.10')
> +end
> +
> +end
> +
> --
> -- Get replicaset object by bucket identifier.
> -- @param bucket_id Bucket identifier.
> @@ -1268,6 +1444,7 @@ local router_mt = {
> callrw = router_callrw;
> callre = router_callre;
> callbre = router_callbre;
> + map_callrw = router_map_callrw,
> route = router_route;
> routeall = router_routeall;
> bucket_id = router_bucket_id,
> @@ -1365,6 +1542,9 @@ end
> if not rawget(_G, MODULE_INTERNALS) then
> rawset(_G, MODULE_INTERNALS, M)
> else
> + if not M.ref_id then
> + M.ref_id = 0
> + end
> for _, router in pairs(M.routers) do
> router_cfg(router, router.current_cfg, true)
> setmetatable(router, router_mt)
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 31f668f..0a14440 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -2415,6 +2415,50 @@ local function storage_call(bucket_id, mode, name, args)
> return ok, ret1, ret2, ret3
> end
>
> +--
> +-- Bind a new storage ref to the current box session. Is used as a part of
> +-- Map-Reduce API.
> +--
> +local function storage_ref(rid, timeout)
> + local ok, err = lref.add(rid, box.session.id(), timeout)
> + if not ok then
> + return nil, err
> + end
> + return bucket_count()
> +end
> +
> +--
> +-- Drop a storage ref from the current box session. Is used as a part of
> +-- Map-Reduce API.
> +--
> +local function storage_unref(rid)
> + return lref.del(rid, box.session.id())
> +end
> +
> +--
> +-- Execute a user's function under an infinite storage ref protecting from
> +-- bucket moves. The ref should exist before, and is deleted after, regardless
> +-- of the function result. Is used as a part of Map-Reduce API.
> +--
> +local function storage_map(rid, name, args)
> + local ok, err, res
> + local sid = box.session.id()
> + ok, err = lref.use(rid, sid)
> + if not ok then
> + return nil, err
> + end
> + ok, res = local_call(name, args)
> + if not ok then
> + lref.del(rid, sid)
> + return nil, lerror.make(res)
> + end
> + ok, err = lref.del(rid, sid)
> + if not ok then
> + return nil, err
> + end
> + return true, res
> +end
> +
> local service_call_api
>
> local function service_call_test_api(...)
> @@ -2425,6 +2469,9 @@ service_call_api = setmetatable({
> bucket_recv = bucket_recv,
> rebalancer_apply_routes = rebalancer_apply_routes,
> rebalancer_request_state = rebalancer_request_state,
> + storage_ref = storage_ref,
> + storage_unref = storage_unref,
> + storage_map = storage_map,
> test_api = service_call_test_api,
> }, {__serialize = function(api)
> local res = {}
More information about the Tarantool-patches
mailing list