From: Oleg Babin via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>, tarantool-patches@dev.tarantool.org, yaroslav.dynnikov@tarantool.org Subject: Re: [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw() Date: Wed, 24 Feb 2021 13:28:17 +0300 [thread overview] Message-ID: <837d2697-5165-23d6-72bf-f7533af10864@tarantool.org> (raw) In-Reply-To: <f799cf4c753c0e271cfa9309aac4cd33a6a65aaf.1614039039.git.v.shpilevoy@tarantool.org> Thanks a lot for your patch! See 5 comments below. On 23.02.2021 03:15, Vladislav Shpilevoy wrote: > Closes #147 Will read-only map-reduce functions be done in the scope of separate issue/patch? I know about #173 but seems we need to keep information about map_callro function. > @TarantoolBot document > Title: vshard.router.map_callrw() > > `vshard.router.map_callrw()` implements consistent map-reduce over > the entire cluster. Consistency means all the data was accessible, > and didn't move during map requests execution. > > It is useful when need to access potentially all the data in the > cluster or simply huge number of buckets scattered over the > instances and whose individual `vshard.router.call()` would take > too long. > > `Map_callrw()` takes name of the function to call on the storages, > arguments in the format of array, and not required options map. > The only supported option for now is timeout which is applied to > the entire call. Not to individual calls for each storage. > ``` > vshard.router.map_callrw(func_name, args[, {timeout = <seconds>}]) > ``` > > The chosen function is called on the master node of each > replicaset with the given arguments. > > In case of success `vshard.router.map_callrw()` returns a map with > replicaset UUIDs as keys and results of the user's function as > values, like this: > ``` > {uuid1 = {res1}, uuid2 = {res2}, ...} > ``` > If the function returned `nil` or `box.NULL` from one of the > storages, it won't be present in the result map. > > In case of fail it returns nil, error object, and optional > replicaset UUID where the error happened. UUID may not be returned > if the error wasn't about a concrete replicaset. > > For instance, the method fails if not all buckets were found even > if all replicasets were scanned successfully. > > Handling the result looks like this: > ```Lua > res, err, uuid = vshard.router.map_callrw(...) > if not res then > -- Error. > -- 'err' - error object. 'uuid' - optional UUID of replicaset > -- where the error happened. > ... > else > -- Success. > for uuid, value in pairs(res) do > ... > end > end > ``` > > Map-Reduce in vshard works in 3 stages: Ref, Map, Reduce. Ref is > an internal stage which is supposed to ensure data consistency > during user's function execution on all nodes. > > Reduce is not performed by vshard. It is what user's code does > with results of `map_callrw()`. > > Consistency, as it is defined for map-reduce, is not compatible > with rebalancing. Because any bucket move would make the sender > and receiver nodes 'inconsistent' - it is not possible to call a > function on them which could simply access all the data without > doing `vshard.storage.bucket_ref()`. > > This makes Ref stage very intricate as it must work together with > rebalancer to ensure neither of them block each other. > > For this storage has a scheduler specifically for bucket moves and > storage refs which shares storage time between them fairly. > > Definition of fairness depends on how long and frequent the moves > and refs are. This can be configured using storage options > `sched_move_quota` and `sched_ref_quota`. See more details about > them in the corresponding doc section. > > The scheduler configuration may affect map-reduce requests if they > are used a lot during rebalancing. > > Keep in mind that it is not a good idea to use too big timeouts > for `map_callrw()`. Because the router will try to block the > bucket moves for the given timeout on all storages. And in case > something will go wrong, the block will remain for the entire > timeout. This means, in particular, having the timeout longer > than, say, minutes is a super bad way to go unless it is for > tests only. > > Also it is important to remember that `map_callrw()` does not > work on replicas. It works only on masters. This makes it unusable > if at least one replicaset has its master node down. > --- > test/router/map-reduce.result | 636 ++++++++++++++++++++++++++++++++ > test/router/map-reduce.test.lua | 258 +++++++++++++ > test/router/router.result | 9 +- > test/upgrade/upgrade.result | 5 +- > vshard/replicaset.lua | 34 ++ > vshard/router/init.lua | 180 +++++++++ > vshard/storage/init.lua | 47 +++ > 7 files changed, 1164 insertions(+), 5 deletions(-) > create mode 100644 test/router/map-reduce.result > create mode 100644 test/router/map-reduce.test.lua > > diff --git a/test/router/map-reduce.result b/test/router/map-reduce.result > new file mode 100644 > index 0000000..1e8995a > --- /dev/null > +++ b/test/router/map-reduce.result > @@ -0,0 +1,636 @@ > +-- test-run result file version 2 > +test_run = require('test_run').new() > + | --- > + | ... > +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } > + | --- > + | ... > +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } > + | --- > + | ... > +test_run:create_cluster(REPLICASET_1, 'router') > + | --- > + | ... > +test_run:create_cluster(REPLICASET_2, 'router') > + | --- > + | ... > +util = require('util') > + | --- > + | ... > +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') > + | --- > + | ... > +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') > + | --- > + | ... > +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') > + | --- > + | ... > +util.push_rs_filters(test_run) > + | --- > + | ... > +_ = test_run:cmd("create server router_1 with script='router/router_1.lua'") > + | --- > + | ... > +_ = test_run:cmd("start server router_1") > + | --- > + | ... > + > +_ = test_run:switch("router_1") > + | --- > + | ... > +util = require('util') > + | --- > + | ... > + > +-- > +-- gh-147: consistent map-reduce. > +-- > +big_timeout = 1000000 > + | --- > + | ... > +big_timeout_opts = {timeout = big_timeout} > + | --- > + | ... > +vshard.router.cfg(cfg) > + | --- > + | ... > +vshard.router.bootstrap(big_timeout_opts) > + | --- > + | - true > + | ... > +-- Trivial basic sanity test. Multireturn is not supported, should be truncated. > +vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts) > + | --- > + | - <replicaset_2>: > + | - 1 > + | <replicaset_1>: > + | - 1 > + | ... > + > +-- > +-- Fail during connecting to storages. For the succeeded storages the router > +-- tries to send unref. > +-- > +timeout = 0.001 > + | --- > + | ... > +timeout_opts = {timeout = timeout} > + | --- > + | ... > + > +test_run:cmd('stop server storage_1_a') > + | --- > + | - true > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Timeout exceeded > + | ... > +-- Even if ref was sent successfully to storage_2_a, it was deleted before > +-- router returned an error. > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +-- Wait because unref is sent asynchronously. Could arrive not immediately. > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +test_run:cmd('start server storage_1_a') > + | --- > + | - true > + | ... > +-- Works again - router waited for connection being established. > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | - <replicaset_2>: > + | - 1 > + | <replicaset_1>: > + | - 1 > + | ... > + > +-- > +-- Do all the same but with another storage being stopped. The same test is done > +-- again because can't tell at which of the tests to where the router will go > +-- first. > +-- > +test_run:cmd('stop server storage_2_a') > + | --- > + | - true > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Timeout exceeded > + | ... > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +test_run:cmd('start server storage_2_a') > + | --- > + | - true > + | ... > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | - <replicaset_2>: > + | - 1 > + | <replicaset_1>: > + | - 1 > + | ... > + > +-- > +-- Fail at ref stage handling. Unrefs are sent to cancel those refs which > +-- succeeded. To simulate a ref fail make the router think there is a moving > +-- bucket. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lsched = require('vshard.storage.sched') > + | --- > + | ... > +big_timeout = 1000000 > + | --- > + | ... > +lsched.move_start(big_timeout) > + | --- > + | - 1000000 > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Timeout exceeded > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +lsched = require('vshard.storage.sched') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +-- > +-- Do all the same with another storage being busy with a 'move'. > +-- > +big_timeout = 1000000 > + | --- > + | ... > +lsched.move_start(big_timeout) > + | --- > + | - 1000000 > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +lsched.move_end(1) > + | --- > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Timeout exceeded > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +lsched.move_end(1) > + | --- > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | - <replicaset_2>: > + | - 1 > + | <replicaset_1>: > + | - 1 > + | ... > + > +-- > +-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs > +-- correctly. To simulate ref fail add a duplicate manually. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +box.schema.user.grant('storage', 'super') > + | --- > + | ... > +router_sid = nil > + | --- > + | ... > +function save_router_sid() \ > + router_sid = box.session.id() \ > +end > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +box.schema.user.grant('storage', 'super') > + | --- > + | ... > +router_sid = nil > + | --- > + | ... > +function save_router_sid() \ > + router_sid = box.session.id() \ > +end > + | --- > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts) > + | --- > + | - [] > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lref.add(1, router_sid, big_timeout) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +vshard.router.internal.ref_id = 1 > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - 'Can not add a storage ref: duplicate ref' > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +_ = lref.del(1, router_sid) > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > +lref.add(1, router_sid, big_timeout) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +vshard.router.internal.ref_id = 1 > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - 'Can not add a storage ref: duplicate ref' > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +_ = lref.del(1, router_sid) > + | --- > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +-- > +-- Fail if some buckets are not visible. Even if all the known replicasets were > +-- scanned. It means consistency violation. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +bucket_id = box.space._bucket.index.pk:min().id > + | --- > + | ... > +vshard.storage.bucket_force_drop(bucket_id) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - 1 buckets are not discovered > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > +vshard.storage.bucket_force_create(bucket_id) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > +bucket_id = box.space._bucket.index.pk:min().id > + | --- > + | ... > +vshard.storage.bucket_force_drop(bucket_id) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - 1 buckets are not discovered > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > +vshard.storage.bucket_force_create(bucket_id) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +-- > +-- Storage map unit tests. > +-- > + > +-- Map fails not being able to use the ref. > +ok, err = vshard.storage._call('storage_map', 0, 'echo', {1}) > + | --- > + | ... > +ok, err.message > + | --- > + | - null > + | - 'Can not use a storage ref: no session' > + | ... > + > +-- Map fails and clears the ref when the user function fails. > +vshard.storage._call('storage_ref', 0, big_timeout) > + | --- > + | - 1500 > + | ... > +assert(lref.count == 1) > + | --- > + | - true > + | ... > +ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {}) > + | --- > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > +assert(not ok and err.message) > + | --- > + | - Unknown error > + | ... > + > +-- Map fails gracefully when couldn't delete the ref. > +vshard.storage._call('storage_ref', 0, big_timeout) > + | --- > + | - 1500 > + | ... > +ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call', \ > + {'storage_unref', 0}) > + | --- > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > +assert(not ok and err.message) > + | --- > + | - 'Can not delete a storage ref: no ref' > + | ... > + > +-- > +-- Map fail is handled and the router tries to send unrefs. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +need_throw = true > + | --- > + | ... > +function map_throw() \ > + if need_throw then \ > + raise_client_error() \ > + end \ > + return '+' \ > +end > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +need_throw = false > + | --- > + | ... > +function map_throw() \ > + if need_throw then \ > + raise_client_error() \ > + end \ > + return '+' \ > +end > + | --- > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) > + | --- > + | ... > +ok, err.message > + | --- > + | - null > + | - Unknown error > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +need_throw = false > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +need_throw = true > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) > + | --- > + | ... > +ok, err.message > + | --- > + | - null > + | - Unknown error > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('default') > + | --- > + | ... > +_ = test_run:cmd("stop server router_1") > + | --- > + | ... > +_ = test_run:cmd("cleanup server router_1") > + | --- > + | ... > +test_run:drop_cluster(REPLICASET_1) > + | --- > + | ... > +test_run:drop_cluster(REPLICASET_2) > + | --- > + | ... > +_ = test_run:cmd('clear filter') > + | --- > + | ... > diff --git a/test/router/map-reduce.test.lua b/test/router/map-reduce.test.lua > new file mode 100644 > index 0000000..3b63248 > --- /dev/null > +++ b/test/router/map-reduce.test.lua > @@ -0,0 +1,258 @@ > +test_run = require('test_run').new() > +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } > +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } > +test_run:create_cluster(REPLICASET_1, 'router') > +test_run:create_cluster(REPLICASET_2, 'router') > +util = require('util') > +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') > +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') > +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') > +util.push_rs_filters(test_run) > +_ = test_run:cmd("create server router_1 with script='router/router_1.lua'") > +_ = test_run:cmd("start server router_1") > + > +_ = test_run:switch("router_1") > +util = require('util') > + > +-- > +-- gh-147: consistent map-reduce. > +-- > +big_timeout = 1000000 > +big_timeout_opts = {timeout = big_timeout} > +vshard.router.cfg(cfg) > +vshard.router.bootstrap(big_timeout_opts) > +-- Trivial basic sanity test. Multireturn is not supported, should be truncated. > +vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts) > + > +-- > +-- Fail during connecting to storages. For the succeeded storages the router > +-- tries to send unref. > +-- > +timeout = 0.001 > +timeout_opts = {timeout = timeout} > + > +test_run:cmd('stop server storage_1_a') > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > +assert(not ok and err.message) > +-- Even if ref was sent successfully to storage_2_a, it was deleted before > +-- router returned an error. > +_ = test_run:switch('storage_2_a') > +lref = require('vshard.storage.ref') > +-- Wait because unref is sent asynchronously. Could arrive not immediately. > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('router_1') > +test_run:cmd('start server storage_1_a') > +-- Works again - router waited for connection being established. > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + > +-- > +-- Do all the same but with another storage being stopped. The same test is done > +-- again because can't tell at which of the tests to where the router will go > +-- first. > +-- > +test_run:cmd('stop server storage_2_a') > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > +assert(not ok and err.message) > +_ = test_run:switch('storage_1_a') > +lref = require('vshard.storage.ref') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('router_1') > +test_run:cmd('start server storage_2_a') > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + > +-- > +-- Fail at ref stage handling. Unrefs are sent to cancel those refs which > +-- succeeded. To simulate a ref fail make the router think there is a moving > +-- bucket. > +-- > +_ = test_run:switch('storage_1_a') > +lsched = require('vshard.storage.sched') > +big_timeout = 1000000 > +lsched.move_start(big_timeout) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_2_a') > +lsched = require('vshard.storage.sched') > +lref = require('vshard.storage.ref') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +-- > +-- Do all the same with another storage being busy with a 'move'. > +-- > +big_timeout = 1000000 > +lsched.move_start(big_timeout) > + > +_ = test_run:switch('storage_1_a') > +lref = require('vshard.storage.ref') > +lsched.move_end(1) > +assert(lref.count == 0) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('storage_2_a') > +lsched.move_end(1) > +assert(lref.count == 0) > + > +_ = test_run:switch('router_1') > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + > +-- > +-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs > +-- correctly. To simulate ref fail add a duplicate manually. > +-- > +_ = test_run:switch('storage_1_a') > +box.schema.user.grant('storage', 'super') > +router_sid = nil > +function save_router_sid() \ > + router_sid = box.session.id() \ > +end > + > +_ = test_run:switch('storage_2_a') > +box.schema.user.grant('storage', 'super') > +router_sid = nil > +function save_router_sid() \ > + router_sid = box.session.id() \ > +end > + > +_ = test_run:switch('router_1') > +vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts) > + > +_ = test_run:switch('storage_1_a') > +lref.add(1, router_sid, big_timeout) > + > +_ = test_run:switch('router_1') > +vshard.router.internal.ref_id = 1 > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_1_a') > +_ = lref.del(1, router_sid) > + > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return lref.count == 0 end) > +lref.add(1, router_sid, big_timeout) > + > +_ = test_run:switch('router_1') > +vshard.router.internal.ref_id = 1 > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_2_a') > +_ = lref.del(1, router_sid) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +-- > +-- Fail if some buckets are not visible. Even if all the known replicasets were > +-- scanned. It means consistency violation. > +-- > +_ = test_run:switch('storage_1_a') > +bucket_id = box.space._bucket.index.pk:min().id > +vshard.storage.bucket_force_drop(bucket_id) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > +vshard.storage.bucket_force_create(bucket_id) > + > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return lref.count == 0 end) > +bucket_id = box.space._bucket.index.pk:min().id > +vshard.storage.bucket_force_drop(bucket_id) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return lref.count == 0 end) > +vshard.storage.bucket_force_create(bucket_id) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +-- > +-- Storage map unit tests. > +-- > + > +-- Map fails not being able to use the ref. > +ok, err = vshard.storage._call('storage_map', 0, 'echo', {1}) > +ok, err.message > + > +-- Map fails and clears the ref when the user function fails. > +vshard.storage._call('storage_ref', 0, big_timeout) > +assert(lref.count == 1) > +ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {}) > +assert(lref.count == 0) > +assert(not ok and err.message) > + > +-- Map fails gracefully when couldn't delete the ref. > +vshard.storage._call('storage_ref', 0, big_timeout) > +ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call', \ > + {'storage_unref', 0}) > +assert(lref.count == 0) > +assert(not ok and err.message) > + > +-- > +-- Map fail is handled and the router tries to send unrefs. > +-- > +_ = test_run:switch('storage_1_a') > +need_throw = true > +function map_throw() \ > + if need_throw then \ > + raise_client_error() \ > + end \ > + return '+' \ > +end > + > +_ = test_run:switch('storage_2_a') > +need_throw = false > +function map_throw() \ > + if need_throw then \ > + raise_client_error() \ > + end \ > + return '+' \ > +end > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) > +ok, err.message > + > +_ = test_run:switch('storage_1_a') > +need_throw = false > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('storage_2_a') > +need_throw = true > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) > +ok, err.message > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('default') > +_ = test_run:cmd("stop server router_1") > +_ = test_run:cmd("cleanup server router_1") > +test_run:drop_cluster(REPLICASET_1) > +test_run:drop_cluster(REPLICASET_2) > +_ = test_run:cmd('clear filter') > \ No newline at end of file > diff --git a/test/router/router.result b/test/router/router.result > index 3c1d073..f9ee37c 100644 > --- a/test/router/router.result > +++ b/test/router/router.result > @@ -1163,14 +1163,15 @@ error_messages > - - Use replicaset:callro(...) instead of replicaset.callro(...) > - Use replicaset:connect_master(...) instead of replicaset.connect_master(...) > - Use replicaset:callre(...) instead of replicaset.callre(...) > - - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) > - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...) > - - Use replicaset:callrw(...) instead of replicaset.callrw(...) > + - Use replicaset:connect(...) instead of replicaset.connect(...) > + - Use replicaset:wait_connected(...) instead of replicaset.wait_connected(...) > + - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) > - Use replicaset:callbro(...) instead of replicaset.callbro(...) > - Use replicaset:connect_all(...) instead of replicaset.connect_all(...) > + - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) > - Use replicaset:call(...) instead of replicaset.call(...) > - - Use replicaset:connect(...) instead of replicaset.connect(...) > - - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) > + - Use replicaset:callrw(...) instead of replicaset.callrw(...) > - Use replicaset:callbre(...) instead of replicaset.callbre(...) > ... > _, replica = next(replicaset.replicas) > diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result > index c2d54a3..833da3f 100644 > --- a/test/upgrade/upgrade.result > +++ b/test/upgrade/upgrade.result > @@ -162,9 +162,12 @@ vshard.storage._call ~= nil > vshard.storage._call('test_api', 1, 2, 3) > | --- > | - bucket_recv: true > + | storage_ref: true > | rebalancer_apply_routes: true > - | test_api: true > + | storage_map: true > | rebalancer_request_state: true > + | test_api: true > + | storage_unref: true > | - 1 > | - 2 > | - 3 > diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua > index 7437e3b..56ea165 100644 > --- a/vshard/replicaset.lua > +++ b/vshard/replicaset.lua > @@ -139,6 +139,39 @@ local function replicaset_connect_master(replicaset) > return replicaset_connect_to_replica(replicaset, master) > end > > +-- > +-- Wait until the master instance is connected. This is necessary at least for > +-- async requests because they fail immediately if the connection is not > +-- established. > +-- Returns the remaining timeout because is expected to be used to connect to > +-- many replicasets in a loop, where such return saves one clock get in the > +-- caller code and is just cleaner code. > +-- > +local function replicaset_wait_connected(replicaset, timeout) > + local deadline = fiber_clock() + timeout > + local ok, res > + while true do > + local conn = replicaset_connect_master(replicaset) > + if conn.state == 'active' then > + return timeout > + end Why don't you use conn:is_connected(). It considers "fetch_schema" as appropriate state. > + -- Netbox uses fiber_cond inside, which throws an irrelevant usage error > + -- at negative timeout. Need to check the case manually. > + if timeout < 0 then > + return nil, lerror.timeout() > + end > + ok, res = pcall(conn.wait_connected, conn, timeout) > + if not ok then > + return nil, lerror.make(res) > + end > + if not res then > + return nil, lerror.timeout() > + end > + timeout = deadline - fiber_clock() > + end > + assert(false) > +end > + > -- > -- Create net.box connections to all replicas and master. > -- > @@ -483,6 +516,7 @@ local replicaset_mt = { > connect_replica = replicaset_connect_to_replica; > down_replica_priority = replicaset_down_replica_priority; > up_replica_priority = replicaset_up_replica_priority; > + wait_connected = replicaset_wait_connected, > call = replicaset_master_call; > callrw = replicaset_master_call; > callro = replicaset_template_multicallro(false, false); > diff --git a/vshard/router/init.lua b/vshard/router/init.lua > index 97bcb0a..8abd77f 100644 > --- a/vshard/router/init.lua > +++ b/vshard/router/init.lua > @@ -44,6 +44,11 @@ if not M then > module_version = 0, > -- Number of router which require collecting lua garbage. > collect_lua_garbage_cnt = 0, > + > + ----------------------- Map-Reduce ----------------------- > + -- Storage Ref ID. It must be unique for each ref request > + -- and therefore is global and monotonically growing. > + ref_id = 0, Maybe 0ULL? > } > end > > @@ -674,6 +679,177 @@ local function router_call(router, bucket_id, opts, ...) > ...) > end > > +local router_map_callrw > + > +if util.version_is_at_least(1, 10, 0) then > +-- > +-- Consistent Map-Reduce. The given function is called on all masters in the > +-- cluster with a guarantee that in case of success it was executed with all > +-- buckets being accessible for reads and writes. > +-- > +-- Consistency in scope of map-reduce means all the data was accessible, and > +-- didn't move during map requests execution. To preserve the consistency there > +-- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce. > +-- > +-- Refs are broadcast before Map stage to pin the buckets to their storages, and > +-- ensure they won't move until maps are done. > +-- > +-- Map requests are broadcast in case all refs are done successfully. They > +-- execute the user function + delete the refs to enable rebalancing again. > +-- > +-- On the storages there are additional means to ensure map-reduces don't block > +-- rebalancing forever and vice versa. > +-- > +-- The function is not as slow as it may seem - it uses netbox's feature > +-- is_async to send refs and maps in parallel. So cost of the function is about > +-- 2 network exchanges to the most far storage in terms of time. > +-- > +-- @param router Router instance to use. > +-- @param func Name of the function to call. > +-- @param args Function arguments passed in netbox style (as an array). > +-- @param opts Can only contain 'timeout' as a number of seconds. Note that the > +-- refs may end up being kept on the storages during this entire timeout if > +-- something goes wrong. For instance, network issues appear. This means > +-- better not use a value bigger than necessary. A stuck infinite ref can > +-- only be dropped by this router restart/reconnect or the storage restart. > +-- > +-- @return In case of success - a map with replicaset UUID keys and values being > +-- what the function returned from the replicaset. > +-- > +-- @return In case of an error - nil, error object, optional UUID of the > +-- replicaset where the error happened. UUID may be not present if it wasn't > +-- about concrete replicaset. For example, not all buckets were found even > +-- though all replicasets were scanned. > +-- > +router_map_callrw = function(router, func, args, opts) > + local replicasets = router.replicasets It would be great to filter here replicasets with bucket_count = 0 and weight = 0. In case if such "dummy" replicasets are disabled we get an error "connection refused". > + local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN > + local deadline = fiber_clock() + timeout > + local err, err_uuid, res, ok, map > + local futures = {} > + local bucket_count = 0 > + local opts_async = {is_async = true} > + local rs_count = 0 > + local rid = M.ref_id > + M.ref_id = rid + 1 > + -- Nil checks are done explicitly here (== nil instead of 'not'), because > + -- netbox requests return box.NULL instead of nils. > + > + -- > + -- Ref stage: send. > + -- > + for uuid, rs in pairs(replicasets) do > + -- Netbox async requests work only with active connections. Need to wait > + -- for the connection explicitly. > + timeout, err = rs:wait_connected(timeout) > + if timeout == nil then > + err_uuid = uuid > + goto fail > + end > + res, err = rs:callrw('vshard.storage._call', > + {'storage_ref', rid, timeout}, opts_async) > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + futures[uuid] = res > + rs_count = rs_count + 1 > + end > + map = table_new(0, rs_count) > + -- > + -- Ref stage: collect. > + -- > + for uuid, future in pairs(futures) do > + res, err = future:wait_result(timeout) > + -- Handle netbox error first. > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + -- Ref returns nil,err or bucket count. > + res, err = unpack(res) Seems `res, err = res[1], res[2]` could be a bit faster. > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + bucket_count = bucket_count + res > + timeout = deadline - fiber_clock() > + end > + -- All refs are done but not all buckets are covered. This is odd and can > + -- mean many things. The most possible ones: 1) outdated configuration on > + -- the router and it does not see another replicaset with more buckets, > + -- 2) some buckets are simply lost or duplicated - could happen as a bug, or > + -- if the user does a maintenance of some kind by creating/deleting buckets. > + -- In both cases can't guarantee all the data would be covered by Map calls. > + if bucket_count ~= router.total_bucket_count then > + err = lerror.vshard(lerror.code.UNKNOWN_BUCKETS, > + router.total_bucket_count - bucket_count) > + goto fail > + end > + -- > + -- Map stage: send. > + -- > + args = {'storage_map', rid, func, args} > + for uuid, rs in pairs(replicasets) do > + res, err = rs:callrw('vshard.storage._call', args, opts_async) > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + futures[uuid] = res > + end > + -- > + -- Ref stage: collect. > + -- > + for uuid, f in pairs(futures) do > + res, err = f:wait_result(timeout) > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + -- Map returns true,res or nil,err. > + ok, res = unpack(res) > + if ok == nil then > + err = res > + err_uuid = uuid > + goto fail > + end > + if res ~= nil then > + -- Store as a table so in future it could be extended for > + -- multireturn. > + map[uuid] = {res} > + end > + timeout = deadline - fiber_clock() > + end > + do return map end > + > +::fail:: > + for uuid, f in pairs(futures) do > + f:discard() > + -- Best effort to remove the created refs before exiting. Can help if > + -- the timeout was big and the error happened early. > + f = replicasets[uuid]:callrw('vshard.storage._call', > + {'storage_unref', rid}, opts_async) > + if f ~= nil then > + -- Don't care waiting for a result - no time for this. But it won't > + -- affect the request sending if the connection is still alive. > + f:discard() > + end > + end > + err = lerror.make(err) > + return nil, err, err_uuid > +end > + > +-- Version >= 1.10. > +else > +-- Version < 1.10. > + > +router_map_callrw = function() > + error('Supported for Tarantool >= 1.10') > +end > + > +end > + > -- > -- Get replicaset object by bucket identifier. > -- @param bucket_id Bucket identifier. > @@ -1268,6 +1444,7 @@ local router_mt = { > callrw = router_callrw; > callre = router_callre; > callbre = router_callbre; > + map_callrw = router_map_callrw, > route = router_route; > routeall = router_routeall; > bucket_id = router_bucket_id, > @@ -1365,6 +1542,9 @@ end > if not rawget(_G, MODULE_INTERNALS) then > rawset(_G, MODULE_INTERNALS, M) > else > + if not M.ref_id then > + M.ref_id = 0 > + end > for _, router in pairs(M.routers) do > router_cfg(router, router.current_cfg, true) > setmetatable(router, router_mt) > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index 31f668f..0a14440 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -2415,6 +2415,50 @@ local function storage_call(bucket_id, mode, name, args) > return ok, ret1, ret2, ret3 > end > > +-- > +-- Bind a new storage ref to the current box session. Is used as a part of > +-- Map-Reduce API. > +-- > +local function storage_ref(rid, timeout) > + local ok, err = lref.add(rid, box.session.id(), timeout) > + if not ok then > + return nil, err > + end > + return bucket_count() > +end > + > +-- > +-- Drop a storage ref from the current box session. Is used as a part of > +-- Map-Reduce API. > +-- > +local function storage_unref(rid) > + return lref.del(rid, box.session.id()) > +end > + > +-- > +-- Execute a user's function under an infinite storage ref protecting from > +-- bucket moves. The ref should exist before, and is deleted after, regardless > +-- of the function result. Is used as a part of Map-Reduce API. > +-- > +local function storage_map(rid, name, args) > + local ok, err, res > + local sid = box.session.id() > + ok, err = lref.use(rid, sid) > + if not ok then > + return nil, err > + end > + ok, res = local_call(name, args) > + if not ok then > + lref.del(rid, sid) > + return nil, lerror.make(res) > + end > + ok, err = lref.del(rid, sid) > + if not ok then > + return nil, err > + end > + return true, res > +end > + > local service_call_api > > local function service_call_test_api(...) > @@ -2425,6 +2469,9 @@ service_call_api = setmetatable({ > bucket_recv = bucket_recv, > rebalancer_apply_routes = rebalancer_apply_routes, > rebalancer_request_state = rebalancer_request_state, > + storage_ref = storage_ref, > + storage_unref = storage_unref, > + storage_map = storage_map, > test_api = service_call_test_api, > }, {__serialize = function(api) > local res = {}
next prev parent reply other threads:[~2021-02-24 10:32 UTC|newest] Thread overview: 47+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-02-23 0:15 [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 01/11] error: introduce vshard.error.timeout() Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:27 ` Oleg Babin via Tarantool-patches 2021-02-24 21:46 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-25 12:42 ` Oleg Babin via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 10/11] sched: introduce vshard.storage.sched module Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:28 ` Oleg Babin via Tarantool-patches 2021-02-24 21:50 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-04 21:02 ` Oleg Babin via Tarantool-patches 2021-03-05 22:06 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-09 8:03 ` Oleg Babin via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw() Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:28 ` Oleg Babin via Tarantool-patches [this message] 2021-02-24 22:04 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-25 12:43 ` Oleg Babin via Tarantool-patches 2021-02-26 23:58 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-01 10:58 ` Oleg Babin via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 02/11] storage: add helper for local functions invocation Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:27 ` Oleg Babin via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 03/11] storage: cache bucket count Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:27 ` Oleg Babin via Tarantool-patches 2021-02-24 21:47 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-25 12:42 ` Oleg Babin via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 04/11] registry: module for circular deps resolution Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:27 ` Oleg Babin via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 05/11] util: introduce safe fiber_cond_wait() Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:27 ` Oleg Babin via Tarantool-patches 2021-02-24 21:48 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-25 12:42 ` Oleg Babin via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 06/11] util: introduce fiber_is_self_canceled() Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:27 ` Oleg Babin via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 07/11] storage: introduce bucket_generation_wait() Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:27 ` Oleg Babin via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 08/11] storage: introduce bucket_are_all_rw() Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:27 ` Oleg Babin via Tarantool-patches 2021-02-24 21:48 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-23 0:15 ` [Tarantool-patches] [PATCH vshard 09/11] ref: introduce vshard.storage.ref module Vladislav Shpilevoy via Tarantool-patches 2021-02-24 10:28 ` Oleg Babin via Tarantool-patches 2021-02-24 21:49 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-25 12:42 ` Oleg Babin via Tarantool-patches 2021-03-04 21:22 ` Oleg Babin via Tarantool-patches 2021-03-05 22:06 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-09 8:03 ` Oleg Babin via Tarantool-patches 2021-03-21 18:49 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-12 23:13 ` [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches 2021-03-15 7:05 ` Oleg Babin via Tarantool-patches 2021-03-28 18:17 ` Vladislav Shpilevoy via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=837d2697-5165-23d6-72bf-f7533af10864@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=olegrok@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=yaroslav.dynnikov@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw()' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox