From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 0BB387030F; Thu, 25 Feb 2021 01:04:32 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 0BB387030F DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1614204272; bh=oHximUBKtqJNEAmKCuNxWK0GnwHUeLtn5gUEDb3POxM=; h=To:References:Date:In-Reply-To:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=SO4LOcxNoml55cTUQhjecanxMhJ3tD/HLmFxugbaMQBipL5GINPcjTOhnGO+a1yZM 2Akf8nzz1I6bIRMY34HiL6/Mufp2sJdisk0dWOfYke8Bvd0CXqJ2TNbmO3OqasGNFk 72Z57DpI6Jm8WCPqqBWbOGWfNSty5YokqvWcVNRQ= Received: from smtp40.i.mail.ru (smtp40.i.mail.ru [94.100.177.100]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 93FF67030F for ; Thu, 25 Feb 2021 01:04:30 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 93FF67030F Received: by smtp40.i.mail.ru with esmtpa (envelope-from ) id 1lF2Gj-0001pB-Cm; Thu, 25 Feb 2021 01:04:30 +0300 To: Oleg Babin , tarantool-patches@dev.tarantool.org, yaroslav.dynnikov@tarantool.org References: <837d2697-5165-23d6-72bf-f7533af10864@tarantool.org> Message-ID: Date: Wed, 24 Feb 2021 23:04:28 +0100 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.7.1 MIME-Version: 1.0 In-Reply-To: <837d2697-5165-23d6-72bf-f7533af10864@tarantool.org> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 8bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD975C3EC174F56692243410BA6471F0166336C1783AA96243D182A05F5380850409C28A94C634F52B1BB1CF70B92E1063830C5995176590FC67DE2880F77224CA6 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7C2204D4F9A221771EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637560334CFC131571A8638F802B75D45FF5571747095F342E8C7A0BC55FA0FE5FCEF6CC647C0669FAF2DDB63A974C825BEE02A98A6B012724C389733CBF5DBD5E913377AFFFEAFD269176DF2183F8FC7C0D9442B0B5983000E8941B15DA834481FCF19DD082D7633A0EF3E4896CB9E6436389733CBF5DBD5E9D5E8D9A59859A8B601F8F2FECC0250C8CC7F00164DA146DA6F5DAA56C3B73B237318B6A418E8EAB8D32BA5DBAC0009BE9E8FC8737B5C2249AF9A92D72D2D186876E601842F6C81A12EF20D2F80756B5F7E9C4E3C761E06A776E601842F6C81A127C277FBC8AE2E8B811D024B8CF2CF9F3AA81AA40904B5D9DBF02ECDB25306B2B25CBF701D1BE8734AD6D5ED66289B5278DA827A17800CE7B2B7C64F398C741067F23339F89546C5A8DF7F3B2552694A6FED454B719173D6725E5C173C3A84C360781E301B93023135872C767BF85DA2F004C906525384306FED454B719173D6462275124DF8B9C9AE7E30DF62CE24E4E5BFE6E7EFDEDCD789D4C264860C145E X-C1DE0DAB: 0D63561A33F958A5636514AF0208892763409CDE790E8F1F70C7973D11E7C1F2D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75968C9853642EB7C3410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D348F9E8EBB4623148756F5F2BC20AF68DD17FB3DA4B2479675BB1FBE043543B0A2BC371A6F3F086BCC1D7E09C32AA3244C111BB5FD51C3E189998F80B0E7E71A7E3E8609A02908F271FACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojvz1c9SWJtj/tYYcO6CpSQA== X-Mailru-Sender: 504CC1E875BF3E7D9BC0E5172ADA3110D6EA8972CB8FE09E59251EE6F505434A58AAF33790AE97B607784C02288277CA03E0582D3806FB6A5317862B1921BA260ED6CFD6382C13A6112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw() X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladislav Shpilevoy via Tarantool-patches Reply-To: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Thanks for the review! On 24.02.2021 11:28, Oleg Babin via Tarantool-patches wrote: > Thanks a lot for your patch! See 5 comments below. > > > On 23.02.2021 03:15, Vladislav Shpilevoy wrote: >> Closes #147 > > Will read-only map-reduce functions be done in the scope of separate issue/patch? > > I know about #173 but seems we need to keep information about map_callro function. Perhaps there will be a new ticket, yes. Until 173 is fixed, ro refs seem pointless. The implementation depends on how exactly to fix 173. At this moment I didn't even design it yet. I am thinking if I need to account ro storage refs separated from rw refs in the scheduler or not. Implementation heavily depends on this. If I go for considering ro refs separated, it adds third group of operations to the scheduler complicating it significantly, and a second type of refs to the refs module obviously. It would allow to send buckets and mark them as GARBAGE/SENT while keep ro refs. But would complicate the garbage collector a bit as it would need to consider storage ro refs. On the other hand, it would heavily complicate the code in the scheduler. Like really heavy, I suppose, but I can be wrong. Also the win will be zeroed if we ever implement rebalancing of writable buckets. Besides, the main reason I am more into unified type of refs is that they are used solely for map-reduce. Which means they are taken on all storages. This means if you have SENDING readable bucket on one storage, you have RECEIVING non-readable bucket on another storage. Which makes such ro refs pointless for full cluster scan. Simply won't be able to take ro ref on the RECEIVING storage. If we go for unified refs, it would allow to keep the scheduler relatively simple, but requires urgent fix of 173. Otherwise you can't be sure your data is consistent. Even now you can't be sure with normal requests, which terrifies me and raises a huge question - why the fuck nobody cares? I think I already had a couple of nightmares about 173. Another issue - ro refs won't stop rebalancer from working and don't even participate in the scheduler, if we fix 173 in the simplest way - just invalidate all refs on the replica if a bucket starts moving. If the rebalancer works actively, nearly all your ro map-reduces will return errors during data migration because buckets will move constantly. No throttling via the scheduler. One way to go - switch all replica weights to defaults for the time of data migration manually. So map_callro() will go to master nodes and will participate in the scheduling. Another way to go - don't care and fix 173 in the simplest way. Weights anyway are used super rare AFAIK. Third way to go - implement some smarter fix of 173. I have many ideas here. But neither of them are quick. >> diff --git a/vshard/router/init.lua b/vshard/router/init.lua >> index 97bcb0a..8abd77f 100644 >> --- a/vshard/router/init.lua >> +++ b/vshard/router/init.lua >> @@ -44,6 +44,11 @@ if not M then >>           module_version = 0, >>           -- Number of router which require collecting lua garbage. >>           collect_lua_garbage_cnt = 0, >> + >> +        ----------------------- Map-Reduce ----------------------- >> +        -- Storage Ref ID. It must be unique for each ref request >> +        -- and therefore is global and monotonically growing. >> +        ref_id = 0, > > Maybe 0ULL? Wouldn't break anyway - doubles are precise until 2^53. But an integer should be faster I hope. Changed to 0ULL. But still not sure. I asked Igor about this. If ULL/LL are more performant, I will also use them in storage.ref and storage.sched where possible. They have many counters. >>   @@ -674,6 +679,177 @@ local function router_call(router, bucket_id, opts, ...) >>                               ...) >>   end >>   +local router_map_callrw >> + >> +if util.version_is_at_least(1, 10, 0) then >> +-- >> +-- Consistent Map-Reduce. The given function is called on all masters in the >> +-- cluster with a guarantee that in case of success it was executed with all >> +-- buckets being accessible for reads and writes. >> +-- >> +-- Consistency in scope of map-reduce means all the data was accessible, and >> +-- didn't move during map requests execution. To preserve the consistency there >> +-- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce. >> +-- >> +-- Refs are broadcast before Map stage to pin the buckets to their storages, and >> +-- ensure they won't move until maps are done. >> +-- >> +-- Map requests are broadcast in case all refs are done successfully. They >> +-- execute the user function + delete the refs to enable rebalancing again. >> +-- >> +-- On the storages there are additional means to ensure map-reduces don't block >> +-- rebalancing forever and vice versa. >> +-- >> +-- The function is not as slow as it may seem - it uses netbox's feature >> +-- is_async to send refs and maps in parallel. So cost of the function is about >> +-- 2 network exchanges to the most far storage in terms of time. >> +-- >> +-- @param router Router instance to use. >> +-- @param func Name of the function to call. >> +-- @param args Function arguments passed in netbox style (as an array). >> +-- @param opts Can only contain 'timeout' as a number of seconds. Note that the >> +--     refs may end up being kept on the storages during this entire timeout if >> +--     something goes wrong. For instance, network issues appear. This means >> +--     better not use a value bigger than necessary. A stuck infinite ref can >> +--     only be dropped by this router restart/reconnect or the storage restart. >> +-- >> +-- @return In case of success - a map with replicaset UUID keys and values being >> +--     what the function returned from the replicaset. >> +-- >> +-- @return In case of an error - nil, error object, optional UUID of the >> +--     replicaset where the error happened. UUID may be not present if it wasn't >> +--     about concrete replicaset. For example, not all buckets were found even >> +--     though all replicasets were scanned. >> +-- >> +router_map_callrw = function(router, func, args, opts) >> +    local replicasets = router.replicasets > > It would be great to filter here replicasets with bucket_count = 0 and weight = 0. Information on the router may be outdated. Even if bucket_count is 0, it may still mean the discovery didn't get to there yet. Or the discovery is simply disabled or dead due to a bug. Weight 0 also does not say anything certain about buckets on the storage. It can be that config on the router is outdated, or it is outdated on the storage. Or someone simply does not set the weights for router configs because they are not used here. Or the weights are really 0 and set everywhere, but rebalancing is in progress - buckets move from the storage slowly, and the scheduler allows to squeeze some map-reduces in the meantime. > In case if such "dummy" replicasets are disabled we get an error "connection refused". I need more info here. What is 'disabled' replicaset? And why would 0 discovered buckets or 0 weight lead to the refused connection? In case this is some cartridge specific shit, this is bad. As you can see above, I can't ignore such replicasets. I need to send requests to them anyway. There is a workaround though - even if an error has occurred, continue execution if even without the failed storages I still cover all the buckets. Then having 'disabled' replicasets in the config would result in some unnecessary faulty requests for each map call, but they would work. Although I don't know how to 'return' such errors. I don't want to log them on each request, and don't have a concept of a 'warning' object or something like this. Weird option - in case of this uncertain success return the result, error, uuid. So the function could return - nil, err[, uuid] - fail; - res - success; - res, err[, uuid] - success but with a suspicious issue; >> +    local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN >> +    local deadline = fiber_clock() + timeout >> +    local err, err_uuid, res, ok, map >> +    local futures = {} >> +    local bucket_count = 0 >> +    local opts_async = {is_async = true} >> +    local rs_count = 0 >> +    local rid = M.ref_id >> +    M.ref_id = rid + 1 >> +    -- Nil checks are done explicitly here (== nil instead of 'not'), because >> +    -- netbox requests return box.NULL instead of nils. >> + >> +    -- >> +    -- Ref stage: send. >> +    -- >> +    for uuid, rs in pairs(replicasets) do >> +        -- Netbox async requests work only with active connections. Need to wait >> +        -- for the connection explicitly. >> +        timeout, err = rs:wait_connected(timeout) >> +        if timeout == nil then >> +            err_uuid = uuid >> +            goto fail >> +        end >> +        res, err = rs:callrw('vshard.storage._call', >> +                              {'storage_ref', rid, timeout}, opts_async) >> +        if res == nil then >> +            err_uuid = uuid >> +            goto fail >> +        end >> +        futures[uuid] = res >> +        rs_count = rs_count + 1 >> +    end >> +    map = table_new(0, rs_count) >> +    -- >> +    -- Ref stage: collect. >> +    -- >> +    for uuid, future in pairs(futures) do >> +        res, err = future:wait_result(timeout) >> +        -- Handle netbox error first. >> +        if res == nil then >> +            err_uuid = uuid >> +            goto fail >> +        end >> +        -- Ref returns nil,err or bucket count. >> +        res, err = unpack(res) > > Seems `res, err = res[1], res[2]` could be a bit faster. Indeed. Applied: ==================== @@ -767,7 +767,7 @@ router_map_callrw = function(router, func, args, opts) goto fail end -- Ref returns nil,err or bucket count. - res, err = unpack(res) + res, err = res[1], res[2] if res == nil then err_uuid = uuid ==================== On a not related note, I gave more thought to your idea with doing a 'map-reduce' but not on the whole cluster. And for this we could introduce 'sca_callrw/sca_callro'. Which mean 'scatter'. They could send your requests to the given replicasets (or all by default) and return whatever is back. Just a wrapper on top of a couple of loops with is_async netbox calls. Not sure if need to take storage refs for the calls. Their usage might be not related to buckets really. Just to access the storages and their own local data, not sharded data. In case you want scattering for accessing a set of buckets, I could add `bat_callrw/bat_callro`. Which means 'batch'. They would take a set of bucket ids, match them to replicasets, go to each replicaset just one time for all its buckets, take storage ref there, and execute your function. Or ref all the buckets individually but I don't know what to do if their number is big. Would increase request size, and referencing overhead too much. To attach some context to the buckets bat_call could accept pairs {bucket_id = {func, args}}. Or accept one function name and for each bucket have {bucket_id = {args}}. And bat_call will call your function with the given args on the storage. Maybe with such approach individual refs make more sense. Don't know. If you like these ideas, you can file tickets for them so as they wouldn't be lost and maybe eventually would be implemented.