From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id AC9E571814; Tue, 23 Feb 2021 03:17:24 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org AC9E571814 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1614039444; bh=yxg9lzLx91M7QHNwzkRtPDBU4sX+8hUGk6uWPVfCrkg=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=oY81N6vzi+ZHr9CZO5SyyLoTECcKlTGEuvdyMduOV85zcHlZy3gnFOGH4BClVL48Q kx8gdz2YzlxX7FCuXWGbOU0LmqMBLD19u5hg8CVxX5MX8egQLsLtUd4h3GhKX1zyWS WDgtq8l6rT3GF9EZD+q1jbXkoqToV+6QcWd0jayI= Received: from smtpng2.m.smailru.net (smtpng2.m.smailru.net [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 005D671826 for ; Tue, 23 Feb 2021 03:15:51 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 005D671826 Received: by smtpng2.m.smailru.net with esmtpa (envelope-from ) id 1lELMk-0003CR-U0; Tue, 23 Feb 2021 03:15:51 +0300 To: tarantool-patches@dev.tarantool.org, olegrok@tarantool.org, yaroslav.dynnikov@tarantool.org Date: Tue, 23 Feb 2021 01:15:39 +0100 Message-Id: X-Mailer: git-send-email 2.24.3 (Apple Git-128) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD975C3EC174F566922928A658C60196E87DF7449A63D990329182A05F538085040AD40D05CF0F0FE14956026D8447394B365899A84516B9A379BAE2D8BF8389DC0 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7F1942E6D70B4A2F0EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006378A9F193E39E334918638F802B75D45FF5571747095F342E8C7A0BC55FA0FE5FC425FEB8730D384037BB97FDC71DC90811DD85ADBB087A6D9389733CBF5DBD5E913377AFFFEAFD269A417C69337E82CC2CC7F00164DA146DAFE8445B8C89999729449624AB7ADAF37F6B57BC7E64490611E7FA7ABCAF51C92176DF2183F8FC7C07E7E81EEA8A9722B8941B15DA834481F9449624AB7ADAF3735872C767BF85DA29E625A9149C048EE0A3850AC1BE2E735D2D576BCF940C7364AD6D5ED66289B524E70A05D1297E1BB35872C767BF85DA227C277FBC8AE2E8B02324478173250CF75ECD9A6C639B01B4E70A05D1297E1BBC6867C52282FAC85D9B7C4F32B44FF570DEA551CE99471A000306258E7E6ABB4E4A6367B16DE6309 X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8186998911F362727C4C7A0BC55FA0FE5FC425FEB8730D384037BB97FDC71DC9081E057C075152A7138B1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDB587F3D2152687E5CDC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34C264B329661203DA9F149E48999476FCAC955ED98F125C1F76DA747F6898E295DA0E14E2FCA7CFE61D7E09C32AA3244CA91C2A9750FEB0209D677D407B4A8644B038C9161EF167A1FACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj2drqE2xHc+EVoi1gkRzgXQ== X-Mailru-Sender: 689FA8AB762F73936BC43F508A063822017E798759CE8C51D1F653D130AD65163841015FED1DE5223CC9A89AB576DD93FB559BB5D741EB963CF37A108A312F5C27E8A8C3839CE0E267EA787935ED9F1B X-Mras: Ok Subject: [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw() X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladislav Shpilevoy via Tarantool-patches Reply-To: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Closes #147 @TarantoolBot document Title: vshard.router.map_callrw() `vshard.router.map_callrw()` implements consistent map-reduce over the entire cluster. Consistency means all the data was accessible, and didn't move during map requests execution. It is useful when need to access potentially all the data in the cluster or simply huge number of buckets scattered over the instances and whose individual `vshard.router.call()` would take too long. `Map_callrw()` takes name of the function to call on the storages, arguments in the format of array, and not required options map. The only supported option for now is timeout which is applied to the entire call. Not to individual calls for each storage. ``` vshard.router.map_callrw(func_name, args[, {timeout = }]) ``` The chosen function is called on the master node of each replicaset with the given arguments. In case of success `vshard.router.map_callrw()` returns a map with replicaset UUIDs as keys and results of the user's function as values, like this: ``` {uuid1 = {res1}, uuid2 = {res2}, ...} ``` If the function returned `nil` or `box.NULL` from one of the storages, it won't be present in the result map. In case of fail it returns nil, error object, and optional replicaset UUID where the error happened. UUID may not be returned if the error wasn't about a concrete replicaset. For instance, the method fails if not all buckets were found even if all replicasets were scanned successfully. Handling the result looks like this: ```Lua res, err, uuid = vshard.router.map_callrw(...) if not res then -- Error. -- 'err' - error object. 'uuid' - optional UUID of replicaset -- where the error happened. ... else -- Success. for uuid, value in pairs(res) do ... end end ``` Map-Reduce in vshard works in 3 stages: Ref, Map, Reduce. Ref is an internal stage which is supposed to ensure data consistency during user's function execution on all nodes. Reduce is not performed by vshard. It is what user's code does with results of `map_callrw()`. Consistency, as it is defined for map-reduce, is not compatible with rebalancing. Because any bucket move would make the sender and receiver nodes 'inconsistent' - it is not possible to call a function on them which could simply access all the data without doing `vshard.storage.bucket_ref()`. This makes Ref stage very intricate as it must work together with rebalancer to ensure neither of them block each other. For this storage has a scheduler specifically for bucket moves and storage refs which shares storage time between them fairly. Definition of fairness depends on how long and frequent the moves and refs are. This can be configured using storage options `sched_move_quota` and `sched_ref_quota`. See more details about them in the corresponding doc section. The scheduler configuration may affect map-reduce requests if they are used a lot during rebalancing. Keep in mind that it is not a good idea to use too big timeouts for `map_callrw()`. Because the router will try to block the bucket moves for the given timeout on all storages. And in case something will go wrong, the block will remain for the entire timeout. This means, in particular, having the timeout longer than, say, minutes is a super bad way to go unless it is for tests only. Also it is important to remember that `map_callrw()` does not work on replicas. It works only on masters. This makes it unusable if at least one replicaset has its master node down. --- test/router/map-reduce.result | 636 ++++++++++++++++++++++++++++++++ test/router/map-reduce.test.lua | 258 +++++++++++++ test/router/router.result | 9 +- test/upgrade/upgrade.result | 5 +- vshard/replicaset.lua | 34 ++ vshard/router/init.lua | 180 +++++++++ vshard/storage/init.lua | 47 +++ 7 files changed, 1164 insertions(+), 5 deletions(-) create mode 100644 test/router/map-reduce.result create mode 100644 test/router/map-reduce.test.lua diff --git a/test/router/map-reduce.result b/test/router/map-reduce.result new file mode 100644 index 0000000..1e8995a --- /dev/null +++ b/test/router/map-reduce.result @@ -0,0 +1,636 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } + | --- + | ... +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } + | --- + | ... +test_run:create_cluster(REPLICASET_1, 'router') + | --- + | ... +test_run:create_cluster(REPLICASET_2, 'router') + | --- + | ... +util = require('util') + | --- + | ... +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') + | --- + | ... +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') + | --- + | ... +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') + | --- + | ... +util.push_rs_filters(test_run) + | --- + | ... +_ = test_run:cmd("create server router_1 with script='router/router_1.lua'") + | --- + | ... +_ = test_run:cmd("start server router_1") + | --- + | ... + +_ = test_run:switch("router_1") + | --- + | ... +util = require('util') + | --- + | ... + +-- +-- gh-147: consistent map-reduce. +-- +big_timeout = 1000000 + | --- + | ... +big_timeout_opts = {timeout = big_timeout} + | --- + | ... +vshard.router.cfg(cfg) + | --- + | ... +vshard.router.bootstrap(big_timeout_opts) + | --- + | - true + | ... +-- Trivial basic sanity test. Multireturn is not supported, should be truncated. +vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts) + | --- + | - : + | - 1 + | : + | - 1 + | ... + +-- +-- Fail during connecting to storages. For the succeeded storages the router +-- tries to send unref. +-- +timeout = 0.001 + | --- + | ... +timeout_opts = {timeout = timeout} + | --- + | ... + +test_run:cmd('stop server storage_1_a') + | --- + | - true + | ... +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) + | --- + | ... +assert(not ok and err.message) + | --- + | - Timeout exceeded + | ... +-- Even if ref was sent successfully to storage_2_a, it was deleted before +-- router returned an error. +_ = test_run:switch('storage_2_a') + | --- + | ... +lref = require('vshard.storage.ref') + | --- + | ... +-- Wait because unref is sent asynchronously. Could arrive not immediately. +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +_ = test_run:switch('router_1') + | --- + | ... +test_run:cmd('start server storage_1_a') + | --- + | - true + | ... +-- Works again - router waited for connection being established. +vshard.router.map_callrw('echo', {1}, big_timeout_opts) + | --- + | - : + | - 1 + | : + | - 1 + | ... + +-- +-- Do all the same but with another storage being stopped. The same test is done +-- again because can't tell at which of the tests to where the router will go +-- first. +-- +test_run:cmd('stop server storage_2_a') + | --- + | - true + | ... +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) + | --- + | ... +assert(not ok and err.message) + | --- + | - Timeout exceeded + | ... +_ = test_run:switch('storage_1_a') + | --- + | ... +lref = require('vshard.storage.ref') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +_ = test_run:switch('router_1') + | --- + | ... +test_run:cmd('start server storage_2_a') + | --- + | - true + | ... +vshard.router.map_callrw('echo', {1}, big_timeout_opts) + | --- + | - : + | - 1 + | : + | - 1 + | ... + +-- +-- Fail at ref stage handling. Unrefs are sent to cancel those refs which +-- succeeded. To simulate a ref fail make the router think there is a moving +-- bucket. +-- +_ = test_run:switch('storage_1_a') + | --- + | ... +lsched = require('vshard.storage.sched') + | --- + | ... +big_timeout = 1000000 + | --- + | ... +lsched.move_start(big_timeout) + | --- + | - 1000000 + | ... + +_ = test_run:switch('router_1') + | --- + | ... +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) + | --- + | ... +assert(not ok and err.message) + | --- + | - Timeout exceeded + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +lsched = require('vshard.storage.sched') + | --- + | ... +lref = require('vshard.storage.ref') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +-- +-- Do all the same with another storage being busy with a 'move'. +-- +big_timeout = 1000000 + | --- + | ... +lsched.move_start(big_timeout) + | --- + | - 1000000 + | ... + +_ = test_run:switch('storage_1_a') + | --- + | ... +lref = require('vshard.storage.ref') + | --- + | ... +lsched.move_end(1) + | --- + | ... +assert(lref.count == 0) + | --- + | - true + | ... + +_ = test_run:switch('router_1') + | --- + | ... +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) + | --- + | ... +assert(not ok and err.message) + | --- + | - Timeout exceeded + | ... + +_ = test_run:switch('storage_1_a') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +lsched.move_end(1) + | --- + | ... +assert(lref.count == 0) + | --- + | - true + | ... + +_ = test_run:switch('router_1') + | --- + | ... +vshard.router.map_callrw('echo', {1}, big_timeout_opts) + | --- + | - : + | - 1 + | : + | - 1 + | ... + +-- +-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs +-- correctly. To simulate ref fail add a duplicate manually. +-- +_ = test_run:switch('storage_1_a') + | --- + | ... +box.schema.user.grant('storage', 'super') + | --- + | ... +router_sid = nil + | --- + | ... +function save_router_sid() \ + router_sid = box.session.id() \ +end + | --- + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +box.schema.user.grant('storage', 'super') + | --- + | ... +router_sid = nil + | --- + | ... +function save_router_sid() \ + router_sid = box.session.id() \ +end + | --- + | ... + +_ = test_run:switch('router_1') + | --- + | ... +vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts) + | --- + | - [] + | ... + +_ = test_run:switch('storage_1_a') + | --- + | ... +lref.add(1, router_sid, big_timeout) + | --- + | - true + | ... + +_ = test_run:switch('router_1') + | --- + | ... +vshard.router.internal.ref_id = 1 + | --- + | ... +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) + | --- + | ... +assert(not ok and err.message) + | --- + | - 'Can not add a storage ref: duplicate ref' + | ... + +_ = test_run:switch('storage_1_a') + | --- + | ... +_ = lref.del(1, router_sid) + | --- + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... +lref.add(1, router_sid, big_timeout) + | --- + | - true + | ... + +_ = test_run:switch('router_1') + | --- + | ... +vshard.router.internal.ref_id = 1 + | --- + | ... +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) + | --- + | ... +assert(not ok and err.message) + | --- + | - 'Can not add a storage ref: duplicate ref' + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +_ = lref.del(1, router_sid) + | --- + | ... + +_ = test_run:switch('storage_1_a') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +-- +-- Fail if some buckets are not visible. Even if all the known replicasets were +-- scanned. It means consistency violation. +-- +_ = test_run:switch('storage_1_a') + | --- + | ... +bucket_id = box.space._bucket.index.pk:min().id + | --- + | ... +vshard.storage.bucket_force_drop(bucket_id) + | --- + | - true + | ... + +_ = test_run:switch('router_1') + | --- + | ... +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) + | --- + | ... +assert(not ok and err.message) + | --- + | - 1 buckets are not discovered + | ... + +_ = test_run:switch('storage_1_a') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... +vshard.storage.bucket_force_create(bucket_id) + | --- + | - true + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... +bucket_id = box.space._bucket.index.pk:min().id + | --- + | ... +vshard.storage.bucket_force_drop(bucket_id) + | --- + | - true + | ... + +_ = test_run:switch('router_1') + | --- + | ... +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) + | --- + | ... +assert(not ok and err.message) + | --- + | - 1 buckets are not discovered + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... +vshard.storage.bucket_force_create(bucket_id) + | --- + | - true + | ... + +_ = test_run:switch('storage_1_a') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +-- +-- Storage map unit tests. +-- + +-- Map fails not being able to use the ref. +ok, err = vshard.storage._call('storage_map', 0, 'echo', {1}) + | --- + | ... +ok, err.message + | --- + | - null + | - 'Can not use a storage ref: no session' + | ... + +-- Map fails and clears the ref when the user function fails. +vshard.storage._call('storage_ref', 0, big_timeout) + | --- + | - 1500 + | ... +assert(lref.count == 1) + | --- + | - true + | ... +ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {}) + | --- + | ... +assert(lref.count == 0) + | --- + | - true + | ... +assert(not ok and err.message) + | --- + | - Unknown error + | ... + +-- Map fails gracefully when couldn't delete the ref. +vshard.storage._call('storage_ref', 0, big_timeout) + | --- + | - 1500 + | ... +ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call', \ + {'storage_unref', 0}) + | --- + | ... +assert(lref.count == 0) + | --- + | - true + | ... +assert(not ok and err.message) + | --- + | - 'Can not delete a storage ref: no ref' + | ... + +-- +-- Map fail is handled and the router tries to send unrefs. +-- +_ = test_run:switch('storage_1_a') + | --- + | ... +need_throw = true + | --- + | ... +function map_throw() \ + if need_throw then \ + raise_client_error() \ + end \ + return '+' \ +end + | --- + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +need_throw = false + | --- + | ... +function map_throw() \ + if need_throw then \ + raise_client_error() \ + end \ + return '+' \ +end + | --- + | ... + +_ = test_run:switch('router_1') + | --- + | ... +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) + | --- + | ... +ok, err.message + | --- + | - null + | - Unknown error + | ... + +_ = test_run:switch('storage_1_a') + | --- + | ... +need_throw = false + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +need_throw = true + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +_ = test_run:switch('router_1') + | --- + | ... +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) + | --- + | ... +ok, err.message + | --- + | - null + | - Unknown error + | ... + +_ = test_run:switch('storage_1_a') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +_ = test_run:switch('storage_2_a') + | --- + | ... +test_run:wait_cond(function() return lref.count == 0 end) + | --- + | - true + | ... + +_ = test_run:switch('default') + | --- + | ... +_ = test_run:cmd("stop server router_1") + | --- + | ... +_ = test_run:cmd("cleanup server router_1") + | --- + | ... +test_run:drop_cluster(REPLICASET_1) + | --- + | ... +test_run:drop_cluster(REPLICASET_2) + | --- + | ... +_ = test_run:cmd('clear filter') + | --- + | ... diff --git a/test/router/map-reduce.test.lua b/test/router/map-reduce.test.lua new file mode 100644 index 0000000..3b63248 --- /dev/null +++ b/test/router/map-reduce.test.lua @@ -0,0 +1,258 @@ +test_run = require('test_run').new() +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } +test_run:create_cluster(REPLICASET_1, 'router') +test_run:create_cluster(REPLICASET_2, 'router') +util = require('util') +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') +util.push_rs_filters(test_run) +_ = test_run:cmd("create server router_1 with script='router/router_1.lua'") +_ = test_run:cmd("start server router_1") + +_ = test_run:switch("router_1") +util = require('util') + +-- +-- gh-147: consistent map-reduce. +-- +big_timeout = 1000000 +big_timeout_opts = {timeout = big_timeout} +vshard.router.cfg(cfg) +vshard.router.bootstrap(big_timeout_opts) +-- Trivial basic sanity test. Multireturn is not supported, should be truncated. +vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts) + +-- +-- Fail during connecting to storages. For the succeeded storages the router +-- tries to send unref. +-- +timeout = 0.001 +timeout_opts = {timeout = timeout} + +test_run:cmd('stop server storage_1_a') +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) +assert(not ok and err.message) +-- Even if ref was sent successfully to storage_2_a, it was deleted before +-- router returned an error. +_ = test_run:switch('storage_2_a') +lref = require('vshard.storage.ref') +-- Wait because unref is sent asynchronously. Could arrive not immediately. +test_run:wait_cond(function() return lref.count == 0 end) + +_ = test_run:switch('router_1') +test_run:cmd('start server storage_1_a') +-- Works again - router waited for connection being established. +vshard.router.map_callrw('echo', {1}, big_timeout_opts) + +-- +-- Do all the same but with another storage being stopped. The same test is done +-- again because can't tell at which of the tests to where the router will go +-- first. +-- +test_run:cmd('stop server storage_2_a') +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) +assert(not ok and err.message) +_ = test_run:switch('storage_1_a') +lref = require('vshard.storage.ref') +test_run:wait_cond(function() return lref.count == 0 end) + +_ = test_run:switch('router_1') +test_run:cmd('start server storage_2_a') +vshard.router.map_callrw('echo', {1}, big_timeout_opts) + +-- +-- Fail at ref stage handling. Unrefs are sent to cancel those refs which +-- succeeded. To simulate a ref fail make the router think there is a moving +-- bucket. +-- +_ = test_run:switch('storage_1_a') +lsched = require('vshard.storage.sched') +big_timeout = 1000000 +lsched.move_start(big_timeout) + +_ = test_run:switch('router_1') +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) +assert(not ok and err.message) + +_ = test_run:switch('storage_2_a') +lsched = require('vshard.storage.sched') +lref = require('vshard.storage.ref') +test_run:wait_cond(function() return lref.count == 0 end) + +-- +-- Do all the same with another storage being busy with a 'move'. +-- +big_timeout = 1000000 +lsched.move_start(big_timeout) + +_ = test_run:switch('storage_1_a') +lref = require('vshard.storage.ref') +lsched.move_end(1) +assert(lref.count == 0) + +_ = test_run:switch('router_1') +ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts) +assert(not ok and err.message) + +_ = test_run:switch('storage_1_a') +test_run:wait_cond(function() return lref.count == 0 end) + +_ = test_run:switch('storage_2_a') +lsched.move_end(1) +assert(lref.count == 0) + +_ = test_run:switch('router_1') +vshard.router.map_callrw('echo', {1}, big_timeout_opts) + +-- +-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs +-- correctly. To simulate ref fail add a duplicate manually. +-- +_ = test_run:switch('storage_1_a') +box.schema.user.grant('storage', 'super') +router_sid = nil +function save_router_sid() \ + router_sid = box.session.id() \ +end + +_ = test_run:switch('storage_2_a') +box.schema.user.grant('storage', 'super') +router_sid = nil +function save_router_sid() \ + router_sid = box.session.id() \ +end + +_ = test_run:switch('router_1') +vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts) + +_ = test_run:switch('storage_1_a') +lref.add(1, router_sid, big_timeout) + +_ = test_run:switch('router_1') +vshard.router.internal.ref_id = 1 +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) +assert(not ok and err.message) + +_ = test_run:switch('storage_1_a') +_ = lref.del(1, router_sid) + +_ = test_run:switch('storage_2_a') +test_run:wait_cond(function() return lref.count == 0 end) +lref.add(1, router_sid, big_timeout) + +_ = test_run:switch('router_1') +vshard.router.internal.ref_id = 1 +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) +assert(not ok and err.message) + +_ = test_run:switch('storage_2_a') +_ = lref.del(1, router_sid) + +_ = test_run:switch('storage_1_a') +test_run:wait_cond(function() return lref.count == 0 end) + +-- +-- Fail if some buckets are not visible. Even if all the known replicasets were +-- scanned. It means consistency violation. +-- +_ = test_run:switch('storage_1_a') +bucket_id = box.space._bucket.index.pk:min().id +vshard.storage.bucket_force_drop(bucket_id) + +_ = test_run:switch('router_1') +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) +assert(not ok and err.message) + +_ = test_run:switch('storage_1_a') +test_run:wait_cond(function() return lref.count == 0 end) +vshard.storage.bucket_force_create(bucket_id) + +_ = test_run:switch('storage_2_a') +test_run:wait_cond(function() return lref.count == 0 end) +bucket_id = box.space._bucket.index.pk:min().id +vshard.storage.bucket_force_drop(bucket_id) + +_ = test_run:switch('router_1') +ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts) +assert(not ok and err.message) + +_ = test_run:switch('storage_2_a') +test_run:wait_cond(function() return lref.count == 0 end) +vshard.storage.bucket_force_create(bucket_id) + +_ = test_run:switch('storage_1_a') +test_run:wait_cond(function() return lref.count == 0 end) + +-- +-- Storage map unit tests. +-- + +-- Map fails not being able to use the ref. +ok, err = vshard.storage._call('storage_map', 0, 'echo', {1}) +ok, err.message + +-- Map fails and clears the ref when the user function fails. +vshard.storage._call('storage_ref', 0, big_timeout) +assert(lref.count == 1) +ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {}) +assert(lref.count == 0) +assert(not ok and err.message) + +-- Map fails gracefully when couldn't delete the ref. +vshard.storage._call('storage_ref', 0, big_timeout) +ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call', \ + {'storage_unref', 0}) +assert(lref.count == 0) +assert(not ok and err.message) + +-- +-- Map fail is handled and the router tries to send unrefs. +-- +_ = test_run:switch('storage_1_a') +need_throw = true +function map_throw() \ + if need_throw then \ + raise_client_error() \ + end \ + return '+' \ +end + +_ = test_run:switch('storage_2_a') +need_throw = false +function map_throw() \ + if need_throw then \ + raise_client_error() \ + end \ + return '+' \ +end + +_ = test_run:switch('router_1') +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) +ok, err.message + +_ = test_run:switch('storage_1_a') +need_throw = false +test_run:wait_cond(function() return lref.count == 0 end) + +_ = test_run:switch('storage_2_a') +need_throw = true +test_run:wait_cond(function() return lref.count == 0 end) + +_ = test_run:switch('router_1') +ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts) +ok, err.message + +_ = test_run:switch('storage_1_a') +test_run:wait_cond(function() return lref.count == 0 end) + +_ = test_run:switch('storage_2_a') +test_run:wait_cond(function() return lref.count == 0 end) + +_ = test_run:switch('default') +_ = test_run:cmd("stop server router_1") +_ = test_run:cmd("cleanup server router_1") +test_run:drop_cluster(REPLICASET_1) +test_run:drop_cluster(REPLICASET_2) +_ = test_run:cmd('clear filter') \ No newline at end of file diff --git a/test/router/router.result b/test/router/router.result index 3c1d073..f9ee37c 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1163,14 +1163,15 @@ error_messages - - Use replicaset:callro(...) instead of replicaset.callro(...) - Use replicaset:connect_master(...) instead of replicaset.connect_master(...) - Use replicaset:callre(...) instead of replicaset.callre(...) - - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...) - - Use replicaset:callrw(...) instead of replicaset.callrw(...) + - Use replicaset:connect(...) instead of replicaset.connect(...) + - Use replicaset:wait_connected(...) instead of replicaset.wait_connected(...) + - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) - Use replicaset:callbro(...) instead of replicaset.callbro(...) - Use replicaset:connect_all(...) instead of replicaset.connect_all(...) + - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) - Use replicaset:call(...) instead of replicaset.call(...) - - Use replicaset:connect(...) instead of replicaset.connect(...) - - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) + - Use replicaset:callrw(...) instead of replicaset.callrw(...) - Use replicaset:callbre(...) instead of replicaset.callbre(...) ... _, replica = next(replicaset.replicas) diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result index c2d54a3..833da3f 100644 --- a/test/upgrade/upgrade.result +++ b/test/upgrade/upgrade.result @@ -162,9 +162,12 @@ vshard.storage._call ~= nil vshard.storage._call('test_api', 1, 2, 3) | --- | - bucket_recv: true + | storage_ref: true | rebalancer_apply_routes: true - | test_api: true + | storage_map: true | rebalancer_request_state: true + | test_api: true + | storage_unref: true | - 1 | - 2 | - 3 diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 7437e3b..56ea165 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -139,6 +139,39 @@ local function replicaset_connect_master(replicaset) return replicaset_connect_to_replica(replicaset, master) end +-- +-- Wait until the master instance is connected. This is necessary at least for +-- async requests because they fail immediately if the connection is not +-- established. +-- Returns the remaining timeout because is expected to be used to connect to +-- many replicasets in a loop, where such return saves one clock get in the +-- caller code and is just cleaner code. +-- +local function replicaset_wait_connected(replicaset, timeout) + local deadline = fiber_clock() + timeout + local ok, res + while true do + local conn = replicaset_connect_master(replicaset) + if conn.state == 'active' then + return timeout + end + -- Netbox uses fiber_cond inside, which throws an irrelevant usage error + -- at negative timeout. Need to check the case manually. + if timeout < 0 then + return nil, lerror.timeout() + end + ok, res = pcall(conn.wait_connected, conn, timeout) + if not ok then + return nil, lerror.make(res) + end + if not res then + return nil, lerror.timeout() + end + timeout = deadline - fiber_clock() + end + assert(false) +end + -- -- Create net.box connections to all replicas and master. -- @@ -483,6 +516,7 @@ local replicaset_mt = { connect_replica = replicaset_connect_to_replica; down_replica_priority = replicaset_down_replica_priority; up_replica_priority = replicaset_up_replica_priority; + wait_connected = replicaset_wait_connected, call = replicaset_master_call; callrw = replicaset_master_call; callro = replicaset_template_multicallro(false, false); diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 97bcb0a..8abd77f 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -44,6 +44,11 @@ if not M then module_version = 0, -- Number of router which require collecting lua garbage. collect_lua_garbage_cnt = 0, + + ----------------------- Map-Reduce ----------------------- + -- Storage Ref ID. It must be unique for each ref request + -- and therefore is global and monotonically growing. + ref_id = 0, } end @@ -674,6 +679,177 @@ local function router_call(router, bucket_id, opts, ...) ...) end +local router_map_callrw + +if util.version_is_at_least(1, 10, 0) then +-- +-- Consistent Map-Reduce. The given function is called on all masters in the +-- cluster with a guarantee that in case of success it was executed with all +-- buckets being accessible for reads and writes. +-- +-- Consistency in scope of map-reduce means all the data was accessible, and +-- didn't move during map requests execution. To preserve the consistency there +-- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce. +-- +-- Refs are broadcast before Map stage to pin the buckets to their storages, and +-- ensure they won't move until maps are done. +-- +-- Map requests are broadcast in case all refs are done successfully. They +-- execute the user function + delete the refs to enable rebalancing again. +-- +-- On the storages there are additional means to ensure map-reduces don't block +-- rebalancing forever and vice versa. +-- +-- The function is not as slow as it may seem - it uses netbox's feature +-- is_async to send refs and maps in parallel. So cost of the function is about +-- 2 network exchanges to the most far storage in terms of time. +-- +-- @param router Router instance to use. +-- @param func Name of the function to call. +-- @param args Function arguments passed in netbox style (as an array). +-- @param opts Can only contain 'timeout' as a number of seconds. Note that the +-- refs may end up being kept on the storages during this entire timeout if +-- something goes wrong. For instance, network issues appear. This means +-- better not use a value bigger than necessary. A stuck infinite ref can +-- only be dropped by this router restart/reconnect or the storage restart. +-- +-- @return In case of success - a map with replicaset UUID keys and values being +-- what the function returned from the replicaset. +-- +-- @return In case of an error - nil, error object, optional UUID of the +-- replicaset where the error happened. UUID may be not present if it wasn't +-- about concrete replicaset. For example, not all buckets were found even +-- though all replicasets were scanned. +-- +router_map_callrw = function(router, func, args, opts) + local replicasets = router.replicasets + local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN + local deadline = fiber_clock() + timeout + local err, err_uuid, res, ok, map + local futures = {} + local bucket_count = 0 + local opts_async = {is_async = true} + local rs_count = 0 + local rid = M.ref_id + M.ref_id = rid + 1 + -- Nil checks are done explicitly here (== nil instead of 'not'), because + -- netbox requests return box.NULL instead of nils. + + -- + -- Ref stage: send. + -- + for uuid, rs in pairs(replicasets) do + -- Netbox async requests work only with active connections. Need to wait + -- for the connection explicitly. + timeout, err = rs:wait_connected(timeout) + if timeout == nil then + err_uuid = uuid + goto fail + end + res, err = rs:callrw('vshard.storage._call', + {'storage_ref', rid, timeout}, opts_async) + if res == nil then + err_uuid = uuid + goto fail + end + futures[uuid] = res + rs_count = rs_count + 1 + end + map = table_new(0, rs_count) + -- + -- Ref stage: collect. + -- + for uuid, future in pairs(futures) do + res, err = future:wait_result(timeout) + -- Handle netbox error first. + if res == nil then + err_uuid = uuid + goto fail + end + -- Ref returns nil,err or bucket count. + res, err = unpack(res) + if res == nil then + err_uuid = uuid + goto fail + end + bucket_count = bucket_count + res + timeout = deadline - fiber_clock() + end + -- All refs are done but not all buckets are covered. This is odd and can + -- mean many things. The most possible ones: 1) outdated configuration on + -- the router and it does not see another replicaset with more buckets, + -- 2) some buckets are simply lost or duplicated - could happen as a bug, or + -- if the user does a maintenance of some kind by creating/deleting buckets. + -- In both cases can't guarantee all the data would be covered by Map calls. + if bucket_count ~= router.total_bucket_count then + err = lerror.vshard(lerror.code.UNKNOWN_BUCKETS, + router.total_bucket_count - bucket_count) + goto fail + end + -- + -- Map stage: send. + -- + args = {'storage_map', rid, func, args} + for uuid, rs in pairs(replicasets) do + res, err = rs:callrw('vshard.storage._call', args, opts_async) + if res == nil then + err_uuid = uuid + goto fail + end + futures[uuid] = res + end + -- + -- Ref stage: collect. + -- + for uuid, f in pairs(futures) do + res, err = f:wait_result(timeout) + if res == nil then + err_uuid = uuid + goto fail + end + -- Map returns true,res or nil,err. + ok, res = unpack(res) + if ok == nil then + err = res + err_uuid = uuid + goto fail + end + if res ~= nil then + -- Store as a table so in future it could be extended for + -- multireturn. + map[uuid] = {res} + end + timeout = deadline - fiber_clock() + end + do return map end + +::fail:: + for uuid, f in pairs(futures) do + f:discard() + -- Best effort to remove the created refs before exiting. Can help if + -- the timeout was big and the error happened early. + f = replicasets[uuid]:callrw('vshard.storage._call', + {'storage_unref', rid}, opts_async) + if f ~= nil then + -- Don't care waiting for a result - no time for this. But it won't + -- affect the request sending if the connection is still alive. + f:discard() + end + end + err = lerror.make(err) + return nil, err, err_uuid +end + +-- Version >= 1.10. +else +-- Version < 1.10. + +router_map_callrw = function() + error('Supported for Tarantool >= 1.10') +end + +end + -- -- Get replicaset object by bucket identifier. -- @param bucket_id Bucket identifier. @@ -1268,6 +1444,7 @@ local router_mt = { callrw = router_callrw; callre = router_callre; callbre = router_callbre; + map_callrw = router_map_callrw, route = router_route; routeall = router_routeall; bucket_id = router_bucket_id, @@ -1365,6 +1542,9 @@ end if not rawget(_G, MODULE_INTERNALS) then rawset(_G, MODULE_INTERNALS, M) else + if not M.ref_id then + M.ref_id = 0 + end for _, router in pairs(M.routers) do router_cfg(router, router.current_cfg, true) setmetatable(router, router_mt) diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 31f668f..0a14440 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -2415,6 +2415,50 @@ local function storage_call(bucket_id, mode, name, args) return ok, ret1, ret2, ret3 end +-- +-- Bind a new storage ref to the current box session. Is used as a part of +-- Map-Reduce API. +-- +local function storage_ref(rid, timeout) + local ok, err = lref.add(rid, box.session.id(), timeout) + if not ok then + return nil, err + end + return bucket_count() +end + +-- +-- Drop a storage ref from the current box session. Is used as a part of +-- Map-Reduce API. +-- +local function storage_unref(rid) + return lref.del(rid, box.session.id()) +end + +-- +-- Execute a user's function under an infinite storage ref protecting from +-- bucket moves. The ref should exist before, and is deleted after, regardless +-- of the function result. Is used as a part of Map-Reduce API. +-- +local function storage_map(rid, name, args) + local ok, err, res + local sid = box.session.id() + ok, err = lref.use(rid, sid) + if not ok then + return nil, err + end + ok, res = local_call(name, args) + if not ok then + lref.del(rid, sid) + return nil, lerror.make(res) + end + ok, err = lref.del(rid, sid) + if not ok then + return nil, err + end + return true, res +end + local service_call_api local function service_call_test_api(...) @@ -2425,6 +2469,9 @@ service_call_api = setmetatable({ bucket_recv = bucket_recv, rebalancer_apply_routes = rebalancer_apply_routes, rebalancer_request_state = rebalancer_request_state, + storage_ref = storage_ref, + storage_unref = storage_unref, + storage_map = storage_map, test_api = service_call_test_api, }, {__serialize = function(api) local res = {} -- 2.24.3 (Apple Git-128)