From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 23DAD718AF; Wed, 24 Feb 2021 13:32:23 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 23DAD718AF DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1614162743; bh=rpYZzHmBm4RHJliV9r4K+wTOuU9vxW+D+FhYWjtjyGo=; h=To:References:Date:In-Reply-To:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=tKTdVhhvHiSMMwZg4o9EscI1bf/ta6X9Mqq3Yic0RGT7tOmJcO6MCua1Kqwjovh9h 9Mv8i9l71A2utgj3QS1H/dlWo+QNduOwc1rCMhT9RdUoibmvTwwGR65p5qUebj/hR1 FnL1MuXTcym6fA4NTM2evxbxJV1y9Hm2m++Cdz8s= Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id BB74A718AF for ; Wed, 24 Feb 2021 13:28:18 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org BB74A718AF Received: by smtp36.i.mail.ru with esmtpa (envelope-from ) id 1lErOz-00053c-KR; Wed, 24 Feb 2021 13:28:18 +0300 To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, yaroslav.dynnikov@tarantool.org References: Message-ID: <837d2697-5165-23d6-72bf-f7533af10864@tarantool.org> Date: Wed, 24 Feb 2021 13:28:17 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.16; rv:78.0) Gecko/20100101 Thunderbird/78.7.1 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit Content-Language: en-GB X-7564579A: B8F34718100C35BD X-77F55803: 4F1203BC0FB41BD975C3EC174F5669228C980786C9DBA186ED94931C77F7596E182A05F538085040678B46230DA08BD12339C9F7D209084D1286283B7EA0CA69103DE42AA81BAB97 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7FCFCB92DA8654BB0EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F790063716A4A39B750036BB8638F802B75D45FF5571747095F342E8C7A0BC55FA0FE5FC37516A8786216141805F321DB2F23C137127F58F508DCB26389733CBF5DBD5E913377AFFFEAFD269A417C69337E82CC2CC7F00164DA146DAFE8445B8C89999729449624AB7ADAF37F6B57BC7E64490611E7FA7ABCAF51C92A417C69337E82CC2CC7F00164DA146DA6F5DAA56C3B73B237318B6A418E8EAB8D32BA5DBAC0009BE9E8FC8737B5C2249D245858782CAF09476E601842F6C81A12EF20D2F80756B5F7E9C4E3C761E06A776E601842F6C81A127C277FBC8AE2E8B61E728C17FF0E8C03AA81AA40904B5D9DBF02ECDB25306B2B25CBF701D1BE8734AD6D5ED66289B5278DA827A17800CE73349F3DFB73D9B1167F23339F89546C5A8DF7F3B2552694A6FED454B719173D6725E5C173C3A84C391A60F04C81D059235872C767BF85DA2F004C906525384306FED454B719173D6462275124DF8B9C9DE2850DD75B2526BE5BFE6E7EFDEDCD789D4C264860C145E X-C1DE0DAB: 0D63561A33F958A59ACD10523306E4FC7072E757945C3C501DE92F13ECC1767AD59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75B7BFB303F1C7DB4D8E8E86DC7131B365E7726E8460B7C23C X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34BF5454112BD5BFD76FA1A18A65E625068CE9648DB6B08ADD22A3BAE04E44D40954241487DB9A6C841D7E09C32AA3244CA5D967AE74842910AC146B9188943832A95CA90A1D8AC565FACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojyK6JYJ15DtKwQYcjkKOvuA== X-Mailru-Sender: 583F1D7ACE8F49BD9317CE1922F30C7E93CE92814FE54E70950B0033721CD8F12187194F3CE393EB23E75C7104EB1B885DEE61814008E47C7013064206BFB89F93956FB04BA385BE9437F6177E88F7363CDA0F3B3F5B9367 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw() X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Oleg Babin via Tarantool-patches Reply-To: Oleg Babin Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Thanks a lot for your patch! See 5 comments below. On 23.02.2021 03:15, Vladislav Shpilevoy wrote: > Closes #147 Will read-only map-reduce functions be done in the scope of separate issue/patch? I know about #173 but seems we need to keep information about map_callro function. > @TarantoolBot document > Title: vshard.router.map_callrw() > > `vshard.router.map_callrw()` implements consistent map-reduce over > the entire cluster. Consistency means all the data was accessible, > and didn't move during map requests execution. > > It is useful when need to access potentially all the data in the > cluster or simply huge number of buckets scattered over the > instances and whose individual `vshard.router.call()` would take > too long. > > `Map_callrw()` takes name of the function to call on the storages, > arguments in the format of array, and not required options map. > The only supported option for now is timeout which is applied to > the entire call. Not to individual calls for each storage. > ``` > vshard.router.map_callrw(func_name, args[, {timeout = }]) > ``` > > The chosen function is called on the master node of each > replicaset with the given arguments. > > In case of success `vshard.router.map_callrw()` returns a map with > replicaset UUIDs as keys and results of the user's function as > values, like this: > ``` > {uuid1 = {res1}, uuid2 = {res2}, ...} > ``` > If the function returned `nil` or `box.NULL` from one of the > storages, it won't be present in the result map. > > In case of fail it returns nil, error object, and optional > replicaset UUID where the error happened. UUID may not be returned > if the error wasn't about a concrete replicaset. > > For instance, the method fails if not all buckets were found even > if all replicasets were scanned successfully. > > Handling the result looks like this: > ```Lua > res, err, uuid = vshard.router.map_callrw(...) > if not res then > -- Error. > -- 'err' - error object. 'uuid' - optional UUID of replicaset > -- where the error happened. > ... > else > -- Success. > for uuid, value in pairs(res) do > ... > end > end > ``` > > Map-Reduce in vshard works in 3 stages: Ref, Map, Reduce. Ref is > an internal stage which is supposed to ensure data consistency > during user's function execution on all nodes. > > Reduce is not performed by vshard. It is what user's code does > with results of `map_callrw()`. > > Consistency, as it is defined for map-reduce, is not compatible > with rebalancing. Because any bucket move would make the sender > and receiver nodes 'inconsistent' - it is not possible to call a > function on them which could simply access all the data without > doing `vshard.storage.bucket_ref()`. > > This makes Ref stage very intricate as it must work together with > rebalancer to ensure neither of them block each other. > > For this storage has a scheduler specifically for bucket moves and > storage refs which shares storage time between them fairly. > > Definition of fairness depends on how long and frequent the moves > and refs are. This can be configured using storage options > `sched_move_quota` and `sched_ref_quota`. See more details about > them in the corresponding doc section. > > The scheduler configuration may affect map-reduce requests if they > are used a lot during rebalancing. > > Keep in mind that it is not a good idea to use too big timeouts > for `map_callrw()`. Because the router will try to block the > bucket moves for the given timeout on all storages. And in case > something will go wrong, the block will remain for the entire > timeout. This means, in particular, having the timeout longer > than, say, minutes is a super bad way to go unless it is for > tests only. > > Also it is important to remember that `map_callrw()` does not > work on replicas. It works only on masters. This makes it unusable > if at least one replicaset has its master node down. > --- > test/router/map-reduce.result | 636 ++++++++++++++++++++++++++++++++ > test/router/map-reduce.test.lua | 258 +++++++++++++ > test/router/router.result | 9 +- > test/upgrade/upgrade.result | 5 +- > vshard/replicaset.lua | 34 ++ > vshard/router/init.lua | 180 +++++++++ > vshard/storage/init.lua | 47 +++ > 7 files changed, 1164 insertions(+), 5 deletions(-) > create mode 100644 test/router/map-reduce.result > create mode 100644 test/router/map-reduce.test.lua > > diff --git a/test/router/map-reduce.result b/test/router/map-reduce.result > new file mode 100644 > index 0000000..1e8995a > --- /dev/null > +++ b/test/router/map-reduce.result > @@ -0,0 +1,636 @@ > +-- test-run result file version 2 > +test_run = require('test_run').new() > + | --- > + | ... > +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } > + | --- > + | ... > +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } > + | --- > + | ... > +test_run:create_cluster(REPLICASET_1, 'router') > + | --- > + | ... > +test_run:create_cluster(REPLICASET_2, 'router') > + | --- > + | ... > +util = require('util') > + | --- > + | ... > +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') > + | --- > + | ... > +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') > + | --- > + | ... > +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') > + | --- > + | ... > +util.push_rs_filters(test_run) > + | --- > + | ... > +_ = test_run:cmd("create server router_1 with script='router/router_1.lua'") > + | --- > + | ... > +_ = test_run:cmd("start server router_1") > + | --- > + | ... > + > +_ = test_run:switch("router_1") > + | --- > + | ... > +util = require('util') > + | --- > + | ... > + > +-- > +-- gh-147: consistent map-reduce. > +-- > +big_timeout = 1000000 > + | --- > + | ... > +big_timeout_opts = {timeout = big_timeout} > + | --- > + | ... > +vshard.router.cfg(cfg) > + | --- > + | ... > +vshard.router.bootstrap(big_timeout_opts) > + | --- > + | - true > + | ... > +-- Trivial basic sanity test. Multireturn is not supported, should be truncated. > +vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts) > + | --- > + | - : > + | - 1 > + | : > + | - 1 > + | ... > + > +-- > +-- Fail during connecting to storages. For the succeeded storages the router > +-- tries to send unref. > +-- > +timeout = 0.001 > + | --- > + | ... > +timeout_opts = {timeout = timeout} > + | --- > + | ... > + > +test_run:cmd('stop server storage_1_a') > + | --- > + | - true > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Timeout exceeded > + | ... > +-- Even if ref was sent successfully to storage_2_a, it was deleted before > +-- router returned an error. > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +-- Wait because unref is sent asynchronously. Could arrive not immediately. > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +test_run:cmd('start server storage_1_a') > + | --- > + | - true > + | ... > +-- Works again - router waited for connection being established. > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | - : > + | - 1 > + | : > + | - 1 > + | ... > + > +-- > +-- Do all the same but with another storage being stopped. The same test is done > +-- again because can't tell at which of the tests to where the router will go > +-- first. > +-- > +test_run:cmd('stop server storage_2_a') > + | --- > + | - true > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Timeout exceeded > + | ... > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +test_run:cmd('start server storage_2_a') > + | --- > + | - true > + | ... > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | - : > + | - 1 > + | : > + | - 1 > + | ... > + > +-- > +-- Fail at ref stage handling. Unrefs are sent to cancel those refs which > +-- succeeded. To simulate a ref fail make the router think there is a moving > +-- bucket. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lsched = require('vshard.storage.sched') > + | --- > + | ... > +big_timeout = 1000000 > + | --- > + | ... > +lsched.move_start(big_timeout) > + | --- > + | - 1000000 > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Timeout exceeded > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +lsched = require('vshard.storage.sched') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +-- > +-- Do all the same with another storage being busy with a 'move'. > +-- > +big_timeout = 1000000 > + | --- > + | ... > +lsched.move_start(big_timeout) > + | --- > + | - 1000000 > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +lsched.move_end(1) > + | --- > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Timeout exceeded > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +lsched.move_end(1) > + | --- > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | - : > + | - 1 > + | : > + | - 1 > + | ... > + > +-- > +-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs > +-- correctly. To simulate ref fail add a duplicate manually. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +box.schema.user.grant('storage', 'super') > + | --- > + | ... > +router_sid = nil > + | --- > + | ... > +function save_router_sid() \ > + router_sid = box.session.id() \ > +end > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +box.schema.user.grant('storage', 'super') > + | --- > + | ... > +router_sid = nil > + | --- > + | ... > +function save_router_sid() \ > + router_sid = box.session.id() \ > +end > + | --- > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts) > + | --- > + | - [] > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lref.add(1, router_sid, big_timeout) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +vshard.router.internal.ref_id = 1 > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - 'Can not add a storage ref: duplicate ref' > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +_ = lref.del(1, router_sid) > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > +lref.add(1, router_sid, big_timeout) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +vshard.router.internal.ref_id = 1 > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - 'Can not add a storage ref: duplicate ref' > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +_ = lref.del(1, router_sid) > + | --- > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +-- > +-- Fail if some buckets are not visible. Even if all the known replicasets were > +-- scanned. It means consistency violation. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +bucket_id = box.space._bucket.index.pk:min().id > + | --- > + | ... > +vshard.storage.bucket_force_drop(bucket_id) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - 1 buckets are not discovered > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > +vshard.storage.bucket_force_create(bucket_id) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > +bucket_id = box.space._bucket.index.pk:min().id > + | --- > + | ... > +vshard.storage.bucket_force_drop(bucket_id) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - 1 buckets are not discovered > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > +vshard.storage.bucket_force_create(bucket_id) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +-- > +-- Storage map unit tests. > +-- > + > +-- Map fails not being able to use the ref. > +ok, err = vshard.storage._call('storage_map', 0, 'echo', {1}) > + | --- > + | ... > +ok, err.message > + | --- > + | - null > + | - 'Can not use a storage ref: no session' > + | ... > + > +-- Map fails and clears the ref when the user function fails. > +vshard.storage._call('storage_ref', 0, big_timeout) > + | --- > + | - 1500 > + | ... > +assert(lref.count == 1) > + | --- > + | - true > + | ... > +ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {}) > + | --- > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > +assert(not ok and err.message) > + | --- > + | - Unknown error > + | ... > + > +-- Map fails gracefully when couldn't delete the ref. > +vshard.storage._call('storage_ref', 0, big_timeout) > + | --- > + | - 1500 > + | ... > +ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call', \ > + {'storage_unref', 0}) > + | --- > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > +assert(not ok and err.message) > + | --- > + | - 'Can not delete a storage ref: no ref' > + | ... > + > +-- > +-- Map fail is handled and the router tries to send unrefs. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +need_throw = true > + | --- > + | ... > +function map_throw() \ > + if need_throw then \ > + raise_client_error() \ > + end \ > + return '+' \ > +end > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +need_throw = false > + | --- > + | ... > +function map_throw() \ > + if need_throw then \ > + raise_client_error() \ > + end \ > + return '+' \ > +end > + | --- > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) > + | --- > + | ... > +ok, err.message > + | --- > + | - null > + | - Unknown error > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +need_throw = false > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +need_throw = true > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('router_1') > + | --- > + | ... > +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) > + | --- > + | ... > +ok, err.message > + | --- > + | - null > + | - Unknown error > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('default') > + | --- > + | ... > +_ = test_run:cmd("stop server router_1") > + | --- > + | ... > +_ = test_run:cmd("cleanup server router_1") > + | --- > + | ... > +test_run:drop_cluster(REPLICASET_1) > + | --- > + | ... > +test_run:drop_cluster(REPLICASET_2) > + | --- > + | ... > +_ = test_run:cmd('clear filter') > + | --- > + | ... > diff --git a/test/router/map-reduce.test.lua b/test/router/map-reduce.test.lua > new file mode 100644 > index 0000000..3b63248 > --- /dev/null > +++ b/test/router/map-reduce.test.lua > @@ -0,0 +1,258 @@ > +test_run = require('test_run').new() > +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } > +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } > +test_run:create_cluster(REPLICASET_1, 'router') > +test_run:create_cluster(REPLICASET_2, 'router') > +util = require('util') > +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') > +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') > +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') > +util.push_rs_filters(test_run) > +_ = test_run:cmd("create server router_1 with script='router/router_1.lua'") > +_ = test_run:cmd("start server router_1") > + > +_ = test_run:switch("router_1") > +util = require('util') > + > +-- > +-- gh-147: consistent map-reduce. > +-- > +big_timeout = 1000000 > +big_timeout_opts = {timeout = big_timeout} > +vshard.router.cfg(cfg) > +vshard.router.bootstrap(big_timeout_opts) > +-- Trivial basic sanity test. Multireturn is not supported, should be truncated. > +vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts) > + > +-- > +-- Fail during connecting to storages. For the succeeded storages the router > +-- tries to send unref. > +-- > +timeout = 0.001 > +timeout_opts = {timeout = timeout} > + > +test_run:cmd('stop server storage_1_a') > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > +assert(not ok and err.message) > +-- Even if ref was sent successfully to storage_2_a, it was deleted before > +-- router returned an error. > +_ = test_run:switch('storage_2_a') > +lref = require('vshard.storage.ref') > +-- Wait because unref is sent asynchronously. Could arrive not immediately. > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('router_1') > +test_run:cmd('start server storage_1_a') > +-- Works again - router waited for connection being established. > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + > +-- > +-- Do all the same but with another storage being stopped. The same test is done > +-- again because can't tell at which of the tests to where the router will go > +-- first. > +-- > +test_run:cmd('stop server storage_2_a') > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > +assert(not ok and err.message) > +_ = test_run:switch('storage_1_a') > +lref = require('vshard.storage.ref') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('router_1') > +test_run:cmd('start server storage_2_a') > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + > +-- > +-- Fail at ref stage handling. Unrefs are sent to cancel those refs which > +-- succeeded. To simulate a ref fail make the router think there is a moving > +-- bucket. > +-- > +_ = test_run:switch('storage_1_a') > +lsched = require('vshard.storage.sched') > +big_timeout = 1000000 > +lsched.move_start(big_timeout) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_2_a') > +lsched = require('vshard.storage.sched') > +lref = require('vshard.storage.ref') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +-- > +-- Do all the same with another storage being busy with a 'move'. > +-- > +big_timeout = 1000000 > +lsched.move_start(big_timeout) > + > +_ = test_run:switch('storage_1_a') > +lref = require('vshard.storage.ref') > +lsched.move_end(1) > +assert(lref.count == 0) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('storage_2_a') > +lsched.move_end(1) > +assert(lref.count == 0) > + > +_ = test_run:switch('router_1') > +vshard.router.map_callrw('echo', {1}, big_timeout_opts) > + > +-- > +-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs > +-- correctly. To simulate ref fail add a duplicate manually. > +-- > +_ = test_run:switch('storage_1_a') > +box.schema.user.grant('storage', 'super') > +router_sid = nil > +function save_router_sid() \ > + router_sid = box.session.id() \ > +end > + > +_ = test_run:switch('storage_2_a') > +box.schema.user.grant('storage', 'super') > +router_sid = nil > +function save_router_sid() \ > + router_sid = box.session.id() \ > +end > + > +_ = test_run:switch('router_1') > +vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts) > + > +_ = test_run:switch('storage_1_a') > +lref.add(1, router_sid, big_timeout) > + > +_ = test_run:switch('router_1') > +vshard.router.internal.ref_id = 1 > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_1_a') > +_ = lref.del(1, router_sid) > + > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return lref.count == 0 end) > +lref.add(1, router_sid, big_timeout) > + > +_ = test_run:switch('router_1') > +vshard.router.internal.ref_id = 1 > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_2_a') > +_ = lref.del(1, router_sid) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +-- > +-- Fail if some buckets are not visible. Even if all the known replicasets were > +-- scanned. It means consistency violation. > +-- > +_ = test_run:switch('storage_1_a') > +bucket_id = box.space._bucket.index.pk:min().id > +vshard.storage.bucket_force_drop(bucket_id) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > +vshard.storage.bucket_force_create(bucket_id) > + > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return lref.count == 0 end) > +bucket_id = box.space._bucket.index.pk:min().id > +vshard.storage.bucket_force_drop(bucket_id) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return lref.count == 0 end) > +vshard.storage.bucket_force_create(bucket_id) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +-- > +-- Storage map unit tests. > +-- > + > +-- Map fails not being able to use the ref. > +ok, err = vshard.storage._call('storage_map', 0, 'echo', {1}) > +ok, err.message > + > +-- Map fails and clears the ref when the user function fails. > +vshard.storage._call('storage_ref', 0, big_timeout) > +assert(lref.count == 1) > +ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {}) > +assert(lref.count == 0) > +assert(not ok and err.message) > + > +-- Map fails gracefully when couldn't delete the ref. > +vshard.storage._call('storage_ref', 0, big_timeout) > +ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call', \ > + {'storage_unref', 0}) > +assert(lref.count == 0) > +assert(not ok and err.message) > + > +-- > +-- Map fail is handled and the router tries to send unrefs. > +-- > +_ = test_run:switch('storage_1_a') > +need_throw = true > +function map_throw() \ > + if need_throw then \ > + raise_client_error() \ > + end \ > + return '+' \ > +end > + > +_ = test_run:switch('storage_2_a') > +need_throw = false > +function map_throw() \ > + if need_throw then \ > + raise_client_error() \ > + end \ > + return '+' \ > +end > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) > +ok, err.message > + > +_ = test_run:switch('storage_1_a') > +need_throw = false > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('storage_2_a') > +need_throw = true > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('router_1') > +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) > +ok, err.message > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch('default') > +_ = test_run:cmd("stop server router_1") > +_ = test_run:cmd("cleanup server router_1") > +test_run:drop_cluster(REPLICASET_1) > +test_run:drop_cluster(REPLICASET_2) > +_ = test_run:cmd('clear filter') > \ No newline at end of file > diff --git a/test/router/router.result b/test/router/router.result > index 3c1d073..f9ee37c 100644 > --- a/test/router/router.result > +++ b/test/router/router.result > @@ -1163,14 +1163,15 @@ error_messages > - - Use replicaset:callro(...) instead of replicaset.callro(...) > - Use replicaset:connect_master(...) instead of replicaset.connect_master(...) > - Use replicaset:callre(...) instead of replicaset.callre(...) > - - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) > - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...) > - - Use replicaset:callrw(...) instead of replicaset.callrw(...) > + - Use replicaset:connect(...) instead of replicaset.connect(...) > + - Use replicaset:wait_connected(...) instead of replicaset.wait_connected(...) > + - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) > - Use replicaset:callbro(...) instead of replicaset.callbro(...) > - Use replicaset:connect_all(...) instead of replicaset.connect_all(...) > + - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) > - Use replicaset:call(...) instead of replicaset.call(...) > - - Use replicaset:connect(...) instead of replicaset.connect(...) > - - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) > + - Use replicaset:callrw(...) instead of replicaset.callrw(...) > - Use replicaset:callbre(...) instead of replicaset.callbre(...) > ... > _, replica = next(replicaset.replicas) > diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result > index c2d54a3..833da3f 100644 > --- a/test/upgrade/upgrade.result > +++ b/test/upgrade/upgrade.result > @@ -162,9 +162,12 @@ vshard.storage._call ~= nil > vshard.storage._call('test_api', 1, 2, 3) > | --- > | - bucket_recv: true > + | storage_ref: true > | rebalancer_apply_routes: true > - | test_api: true > + | storage_map: true > | rebalancer_request_state: true > + | test_api: true > + | storage_unref: true > | - 1 > | - 2 > | - 3 > diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua > index 7437e3b..56ea165 100644 > --- a/vshard/replicaset.lua > +++ b/vshard/replicaset.lua > @@ -139,6 +139,39 @@ local function replicaset_connect_master(replicaset) > return replicaset_connect_to_replica(replicaset, master) > end > > +-- > +-- Wait until the master instance is connected. This is necessary at least for > +-- async requests because they fail immediately if the connection is not > +-- established. > +-- Returns the remaining timeout because is expected to be used to connect to > +-- many replicasets in a loop, where such return saves one clock get in the > +-- caller code and is just cleaner code. > +-- > +local function replicaset_wait_connected(replicaset, timeout) > + local deadline = fiber_clock() + timeout > + local ok, res > + while true do > + local conn = replicaset_connect_master(replicaset) > + if conn.state == 'active' then > + return timeout > + end Why don't you use conn:is_connected(). It considers "fetch_schema" as appropriate state. > + -- Netbox uses fiber_cond inside, which throws an irrelevant usage error > + -- at negative timeout. Need to check the case manually. > + if timeout < 0 then > + return nil, lerror.timeout() > + end > + ok, res = pcall(conn.wait_connected, conn, timeout) > + if not ok then > + return nil, lerror.make(res) > + end > + if not res then > + return nil, lerror.timeout() > + end > + timeout = deadline - fiber_clock() > + end > + assert(false) > +end > + > -- > -- Create net.box connections to all replicas and master. > -- > @@ -483,6 +516,7 @@ local replicaset_mt = { > connect_replica = replicaset_connect_to_replica; > down_replica_priority = replicaset_down_replica_priority; > up_replica_priority = replicaset_up_replica_priority; > + wait_connected = replicaset_wait_connected, > call = replicaset_master_call; > callrw = replicaset_master_call; > callro = replicaset_template_multicallro(false, false); > diff --git a/vshard/router/init.lua b/vshard/router/init.lua > index 97bcb0a..8abd77f 100644 > --- a/vshard/router/init.lua > +++ b/vshard/router/init.lua > @@ -44,6 +44,11 @@ if not M then > module_version = 0, > -- Number of router which require collecting lua garbage. > collect_lua_garbage_cnt = 0, > + > + ----------------------- Map-Reduce ----------------------- > + -- Storage Ref ID. It must be unique for each ref request > + -- and therefore is global and monotonically growing. > + ref_id = 0, Maybe 0ULL? > } > end > > @@ -674,6 +679,177 @@ local function router_call(router, bucket_id, opts, ...) > ...) > end > > +local router_map_callrw > + > +if util.version_is_at_least(1, 10, 0) then > +-- > +-- Consistent Map-Reduce. The given function is called on all masters in the > +-- cluster with a guarantee that in case of success it was executed with all > +-- buckets being accessible for reads and writes. > +-- > +-- Consistency in scope of map-reduce means all the data was accessible, and > +-- didn't move during map requests execution. To preserve the consistency there > +-- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce. > +-- > +-- Refs are broadcast before Map stage to pin the buckets to their storages, and > +-- ensure they won't move until maps are done. > +-- > +-- Map requests are broadcast in case all refs are done successfully. They > +-- execute the user function + delete the refs to enable rebalancing again. > +-- > +-- On the storages there are additional means to ensure map-reduces don't block > +-- rebalancing forever and vice versa. > +-- > +-- The function is not as slow as it may seem - it uses netbox's feature > +-- is_async to send refs and maps in parallel. So cost of the function is about > +-- 2 network exchanges to the most far storage in terms of time. > +-- > +-- @param router Router instance to use. > +-- @param func Name of the function to call. > +-- @param args Function arguments passed in netbox style (as an array). > +-- @param opts Can only contain 'timeout' as a number of seconds. Note that the > +-- refs may end up being kept on the storages during this entire timeout if > +-- something goes wrong. For instance, network issues appear. This means > +-- better not use a value bigger than necessary. A stuck infinite ref can > +-- only be dropped by this router restart/reconnect or the storage restart. > +-- > +-- @return In case of success - a map with replicaset UUID keys and values being > +-- what the function returned from the replicaset. > +-- > +-- @return In case of an error - nil, error object, optional UUID of the > +-- replicaset where the error happened. UUID may be not present if it wasn't > +-- about concrete replicaset. For example, not all buckets were found even > +-- though all replicasets were scanned. > +-- > +router_map_callrw = function(router, func, args, opts) > + local replicasets = router.replicasets It would be great to filter here replicasets with bucket_count = 0 and weight = 0. In case if such "dummy" replicasets are disabled we get an error "connection refused". > + local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN > + local deadline = fiber_clock() + timeout > + local err, err_uuid, res, ok, map > + local futures = {} > + local bucket_count = 0 > + local opts_async = {is_async = true} > + local rs_count = 0 > + local rid = M.ref_id > + M.ref_id = rid + 1 > + -- Nil checks are done explicitly here (== nil instead of 'not'), because > + -- netbox requests return box.NULL instead of nils. > + > + -- > + -- Ref stage: send. > + -- > + for uuid, rs in pairs(replicasets) do > + -- Netbox async requests work only with active connections. Need to wait > + -- for the connection explicitly. > + timeout, err = rs:wait_connected(timeout) > + if timeout == nil then > + err_uuid = uuid > + goto fail > + end > + res, err = rs:callrw('vshard.storage._call', > + {'storage_ref', rid, timeout}, opts_async) > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + futures[uuid] = res > + rs_count = rs_count + 1 > + end > + map = table_new(0, rs_count) > + -- > + -- Ref stage: collect. > + -- > + for uuid, future in pairs(futures) do > + res, err = future:wait_result(timeout) > + -- Handle netbox error first. > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + -- Ref returns nil,err or bucket count. > + res, err = unpack(res) Seems `res, err = res[1], res[2]` could be a bit faster. > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + bucket_count = bucket_count + res > + timeout = deadline - fiber_clock() > + end > + -- All refs are done but not all buckets are covered. This is odd and can > + -- mean many things. The most possible ones: 1) outdated configuration on > + -- the router and it does not see another replicaset with more buckets, > + -- 2) some buckets are simply lost or duplicated - could happen as a bug, or > + -- if the user does a maintenance of some kind by creating/deleting buckets. > + -- In both cases can't guarantee all the data would be covered by Map calls. > + if bucket_count ~= router.total_bucket_count then > + err = lerror.vshard(lerror.code.UNKNOWN_BUCKETS, > + router.total_bucket_count - bucket_count) > + goto fail > + end > + -- > + -- Map stage: send. > + -- > + args = {'storage_map', rid, func, args} > + for uuid, rs in pairs(replicasets) do > + res, err = rs:callrw('vshard.storage._call', args, opts_async) > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + futures[uuid] = res > + end > + -- > + -- Ref stage: collect. > + -- > + for uuid, f in pairs(futures) do > + res, err = f:wait_result(timeout) > + if res == nil then > + err_uuid = uuid > + goto fail > + end > + -- Map returns true,res or nil,err. > + ok, res = unpack(res) > + if ok == nil then > + err = res > + err_uuid = uuid > + goto fail > + end > + if res ~= nil then > + -- Store as a table so in future it could be extended for > + -- multireturn. > + map[uuid] = {res} > + end > + timeout = deadline - fiber_clock() > + end > + do return map end > + > +::fail:: > + for uuid, f in pairs(futures) do > + f:discard() > + -- Best effort to remove the created refs before exiting. Can help if > + -- the timeout was big and the error happened early. > + f = replicasets[uuid]:callrw('vshard.storage._call', > + {'storage_unref', rid}, opts_async) > + if f ~= nil then > + -- Don't care waiting for a result - no time for this. But it won't > + -- affect the request sending if the connection is still alive. > + f:discard() > + end > + end > + err = lerror.make(err) > + return nil, err, err_uuid > +end > + > +-- Version >= 1.10. > +else > +-- Version < 1.10. > + > +router_map_callrw = function() > + error('Supported for Tarantool >= 1.10') > +end > + > +end > + > -- > -- Get replicaset object by bucket identifier. > -- @param bucket_id Bucket identifier. > @@ -1268,6 +1444,7 @@ local router_mt = { > callrw = router_callrw; > callre = router_callre; > callbre = router_callbre; > + map_callrw = router_map_callrw, > route = router_route; > routeall = router_routeall; > bucket_id = router_bucket_id, > @@ -1365,6 +1542,9 @@ end > if not rawget(_G, MODULE_INTERNALS) then > rawset(_G, MODULE_INTERNALS, M) > else > + if not M.ref_id then > + M.ref_id = 0 > + end > for _, router in pairs(M.routers) do > router_cfg(router, router.current_cfg, true) > setmetatable(router, router_mt) > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index 31f668f..0a14440 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -2415,6 +2415,50 @@ local function storage_call(bucket_id, mode, name, args) > return ok, ret1, ret2, ret3 > end > > +-- > +-- Bind a new storage ref to the current box session. Is used as a part of > +-- Map-Reduce API. > +-- > +local function storage_ref(rid, timeout) > + local ok, err = lref.add(rid, box.session.id(), timeout) > + if not ok then > + return nil, err > + end > + return bucket_count() > +end > + > +-- > +-- Drop a storage ref from the current box session. Is used as a part of > +-- Map-Reduce API. > +-- > +local function storage_unref(rid) > + return lref.del(rid, box.session.id()) > +end > + > +-- > +-- Execute a user's function under an infinite storage ref protecting from > +-- bucket moves. The ref should exist before, and is deleted after, regardless > +-- of the function result. Is used as a part of Map-Reduce API. > +-- > +local function storage_map(rid, name, args) > + local ok, err, res > + local sid = box.session.id() > + ok, err = lref.use(rid, sid) > + if not ok then > + return nil, err > + end > + ok, res = local_call(name, args) > + if not ok then > + lref.del(rid, sid) > + return nil, lerror.make(res) > + end > + ok, err = lref.del(rid, sid) > + if not ok then > + return nil, err > + end > + return true, res > +end > + > local service_call_api > > local function service_call_test_api(...) > @@ -2425,6 +2469,9 @@ service_call_api = setmetatable({ > bucket_recv = bucket_recv, > rebalancer_apply_routes = rebalancer_apply_routes, > rebalancer_request_state = rebalancer_request_state, > + storage_ref = storage_ref, > + storage_unref = storage_unref, > + storage_map = storage_map, > test_api = service_call_test_api, > }, {__serialize = function(api) > local res = {}