Tarantool development patches archive
 help / color / mirror / Atom feed
From: Oleg Babin via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>,
	tarantool-patches@dev.tarantool.org,
	yaroslav.dynnikov@tarantool.org
Subject: Re: [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw()
Date: Thu, 25 Feb 2021 15:43:10 +0300
Message-ID: <f3760ddc-55d7-b6db-718b-d6969bb66085@tarantool.org> (raw)
In-Reply-To: <cfee9fa8-b370-78be-c5ec-2c6e3b459bc0@tarantool.org>

Thanks for your detailed explanation. See my comments/answers below.

On 25.02.2021 01:04, Vladislav Shpilevoy wrote:
> Thanks for the review!
>
> On 24.02.2021 11:28, Oleg Babin via Tarantool-patches wrote:
>> Thanks a lot for your patch! See 5 comments below.
>>
>>
>> On 23.02.2021 03:15, Vladislav Shpilevoy wrote:
>>> Closes #147
>> Will read-only map-reduce functions be done in the scope of separate issue/patch?
>>
>> I know about #173 but seems we need to keep information about map_callro function.
> Perhaps there will be a new ticket, yes. Until 173 is fixed, ro refs seem
> pointless. The implementation depends on how exactly to fix 173.
>
> At this moment I didn't even design it yet. I am thinking if I need to
> account ro storage refs separated from rw refs in the scheduler or not.
> Implementation heavily depends on this.
>
> If I go for considering ro refs separated, it adds third group of operations
> to the scheduler complicating it significantly, and a second type of refs to
> the refs module obviously.
>
> It would allow to send buckets and mark them as GARBAGE/SENT while keep ro
> refs. But would complicate the garbage collector a bit as it would need to
> consider storage ro refs.
>
> On the other hand, it would heavily complicate the code in the scheduler.
> Like really heavy, I suppose, but I can be wrong.
>
> Also the win will be zeroed if we ever implement rebalancing of writable
> buckets. Besides, the main reason I am more into unified type of refs is
> that they are used solely for map-reduce. Which means they are taken on all
> storages. This means if you have SENDING readable bucket on one storage,
> you have RECEIVING non-readable bucket on another storage. Which makes
> such ro refs pointless for full cluster scan. Simply won't be able to take
> ro ref on the RECEIVING storage.
>
> If we go for unified refs, it would allow to keep the scheduler relatively
> simple, but requires urgent fix of 173. Otherwise you can't be sure your
> data is consistent. Even now you can't be sure with normal requests, which
> terrifies me and raises a huge question - why the fuck nobody cares? I
> think I already had a couple of nightmares about 173.

Yes, it's a problem but rebalansing is not quite often operation. So, in 
some

cases routeall() was enough. Anyway we didn't have any alternatives. But 
map-reduce it's

really often operation - any request over secondary index and you should 
scan whole cluster.

> Another issue - ro refs won't stop rebalancer from working and don't
> even participate in the scheduler, if we fix 173 in the simplest way -
> just invalidate all refs on the replica if a bucket starts moving. If
> the rebalancer works actively, nearly all your ro map-reduces will return
> errors during data migration because buckets will move constantly. No
> throttling via the scheduler.
>
> One way to go - switch all replica weights to defaults for the time of
> data migration manually. So map_callro() will go to master nodes and
> will participate in the scheduling.
>
> Another way to go - don't care and fix 173 in the simplest way. Weights
> anyway are used super rare AFAIK.
>
> Third way to go - implement some smarter fix of 173. I have many ideas here.
> But neither of them are quick.
>
>>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>>> index 97bcb0a..8abd77f 100644
>>> --- a/vshard/router/init.lua
>>> +++ b/vshard/router/init.lua
>>> @@ -44,6 +44,11 @@ if not M then
>>>            module_version = 0,
>>>            -- Number of router which require collecting lua garbage.
>>>            collect_lua_garbage_cnt = 0,
>>> +
>>> +        ----------------------- Map-Reduce -----------------------
>>> +        -- Storage Ref ID. It must be unique for each ref request
>>> +        -- and therefore is global and monotonically growing.
>>> +        ref_id = 0,
>> Maybe 0ULL?
> Wouldn't break anyway - doubles are precise until 2^53. But an integer
> should be faster I hope.
>
> Changed to 0ULL.
>
> But still not sure. I asked Igor about this. If ULL/LL are more performant,
> I will also use them in storage.ref and storage.sched where possible. They
> have many counters.
>
>>>    @@ -674,6 +679,177 @@ local function router_call(router, bucket_id, opts, ...)
>>>                                ...)
>>>    end
>>>    +local router_map_callrw
>>> +
>>> +if util.version_is_at_least(1, 10, 0) then
>>> +--
>>> +-- Consistent Map-Reduce. The given function is called on all masters in the
>>> +-- cluster with a guarantee that in case of success it was executed with all
>>> +-- buckets being accessible for reads and writes.
>>> +--
>>> +-- Consistency in scope of map-reduce means all the data was accessible, and
>>> +-- didn't move during map requests execution. To preserve the consistency there
>>> +-- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce.
>>> +--
>>> +-- Refs are broadcast before Map stage to pin the buckets to their storages, and
>>> +-- ensure they won't move until maps are done.
>>> +--
>>> +-- Map requests are broadcast in case all refs are done successfully. They
>>> +-- execute the user function + delete the refs to enable rebalancing again.
>>> +--
>>> +-- On the storages there are additional means to ensure map-reduces don't block
>>> +-- rebalancing forever and vice versa.
>>> +--
>>> +-- The function is not as slow as it may seem - it uses netbox's feature
>>> +-- is_async to send refs and maps in parallel. So cost of the function is about
>>> +-- 2 network exchanges to the most far storage in terms of time.
>>> +--
>>> +-- @param router Router instance to use.
>>> +-- @param func Name of the function to call.
>>> +-- @param args Function arguments passed in netbox style (as an array).
>>> +-- @param opts Can only contain 'timeout' as a number of seconds. Note that the
>>> +--     refs may end up being kept on the storages during this entire timeout if
>>> +--     something goes wrong. For instance, network issues appear. This means
>>> +--     better not use a value bigger than necessary. A stuck infinite ref can
>>> +--     only be dropped by this router restart/reconnect or the storage restart.
>>> +--
>>> +-- @return In case of success - a map with replicaset UUID keys and values being
>>> +--     what the function returned from the replicaset.
>>> +--
>>> +-- @return In case of an error - nil, error object, optional UUID of the
>>> +--     replicaset where the error happened. UUID may be not present if it wasn't
>>> +--     about concrete replicaset. For example, not all buckets were found even
>>> +--     though all replicasets were scanned.
>>> +--
>>> +router_map_callrw = function(router, func, args, opts)
>>> +    local replicasets = router.replicasets
>> It would be great to filter here replicasets with bucket_count = 0 and weight = 0.
> Information on the router may be outdated. Even if bucket_count is 0,
> it may still mean the discovery didn't get to there yet. Or the
> discovery is simply disabled or dead due to a bug.
>
> Weight 0 also does not say anything certain about buckets on the
> storage. It can be that config on the router is outdated, or it is
> outdated on the storage. Or someone simply does not set the weights
> for router configs because they are not used here. Or the weights are
> really 0 and set everywhere, but rebalancing is in progress - buckets
> move from the storage slowly, and the scheduler allows to squeeze some
> map-reduces in the meantime.
>
>> In case if such "dummy" replicasets are disabled we get an error "connection refused".
> I need more info here. What is 'disabled' replicaset? And why would
> 0 discovered buckets or 0 weight lead to the refused connection?
>
> In case this is some cartridge specific shit, this is bad. As you
> can see above, I can't ignore such replicasets. I need to send requests
> to them anyway.
>
> There is a workaround though - even if an error has occurred, continue
> execution if even without the failed storages I still cover all the buckets.
> Then having 'disabled' replicasets in the config would result in some
> unnecessary faulty requests for each map call, but they would work.
>
> Although I don't know how to 'return' such errors. I don't want to log them
> on each request, and don't have a concept of a 'warning' object or something
> like this. Weird option - in case of this uncertain success return the
> result, error, uuid. So the function could return
>
> - nil, err[, uuid] - fail;
> - res              - success;
> - res, err[, uuid] - success but with a suspicious issue;
>
Seems I didn't fully state the problem. Maybe it's not even relevant 
issue and users

that do it just create their own problems. But there was a case when our 
customers added

new replicaset (in fact single instance) in cluster. This instance 
didn't have any data and had a weight=0.

Then they just turned off this instance after some time. And all 
requests that perform map-reduce started to fail with "Connection

refused" error.

It causes a question: "Why do our requests fail if we disable instance 
that doesn't have any data".

Yes, requirement of weight=0 is obviously not enough - because if 
rebalansing is in progress replicaset with weight=0

still could contain some data.


Considering an opportunity to finish requests if someone failed - I'm 
not sure that it's really needed.

Usually we don't need some partial result (moreover if it adds some 
workload).


Regarding the question of error. Cartridge has an example of error 
object for map_call function.

For errors cartridge creates some "object" that contains info about all 
errors that were returned.

It's not excellent solution but I think Yaroslav could give some 
thoughts here.


https://github.com/tarantool/cartridge/blob/cf195bc9576eb460d66d609c357bec5014a90d21/cartridge/pool.lua#L235


>>> +    local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN
>>> +    local deadline = fiber_clock() + timeout
>>> +    local err, err_uuid, res, ok, map
>>> +    local futures = {}
>>> +    local bucket_count = 0
>>> +    local opts_async = {is_async = true}
>>> +    local rs_count = 0
>>> +    local rid = M.ref_id
>>> +    M.ref_id = rid + 1
>>> +    -- Nil checks are done explicitly here (== nil instead of 'not'), because
>>> +    -- netbox requests return box.NULL instead of nils.
>>> +
>>> +    --
>>> +    -- Ref stage: send.
>>> +    --
>>> +    for uuid, rs in pairs(replicasets) do
>>> +        -- Netbox async requests work only with active connections. Need to wait
>>> +        -- for the connection explicitly.
>>> +        timeout, err = rs:wait_connected(timeout)
>>> +        if timeout == nil then
>>> +            err_uuid = uuid
>>> +            goto fail
>>> +        end
>>> +        res, err = rs:callrw('vshard.storage._call',
>>> +                              {'storage_ref', rid, timeout}, opts_async)
>>> +        if res == nil then
>>> +            err_uuid = uuid
>>> +            goto fail
>>> +        end
>>> +        futures[uuid] = res
>>> +        rs_count = rs_count + 1
>>> +    end
>>> +    map = table_new(0, rs_count)
>>> +    --
>>> +    -- Ref stage: collect.
>>> +    --
>>> +    for uuid, future in pairs(futures) do
>>> +        res, err = future:wait_result(timeout)
>>> +        -- Handle netbox error first.
>>> +        if res == nil then
>>> +            err_uuid = uuid
>>> +            goto fail
>>> +        end
>>> +        -- Ref returns nil,err or bucket count.
>>> +        res, err = unpack(res)
>> Seems `res, err = res[1], res[2]` could be a bit faster.
> Indeed. Applied:
>
> ====================
> @@ -767,7 +767,7 @@ router_map_callrw = function(router, func, args, opts)
>               goto fail
>           end
>           -- Ref returns nil,err or bucket count.
> -        res, err = unpack(res)
> +        res, err = res[1], res[2]
>           if res == nil then
>               err_uuid = uuid
> ====================
>
> On a not related note, I gave more thought to your idea with
> doing a 'map-reduce' but not on the whole cluster. And for this we
> could introduce 'sca_callrw/sca_callro'. Which mean 'scatter'.
>
> They could send your requests to the given replicasets (or all by
> default) and return whatever is back. Just a wrapper on top of a
> couple of loops with is_async netbox calls. Not sure if need to
> take storage refs for the calls. Their usage might be not related
> to buckets really. Just to access the storages and their own local
> data, not sharded data.
>
> In case you want scattering for accessing a set of buckets, I
> could add `bat_callrw/bat_callro`. Which means 'batch'. They would
> take a set of bucket ids, match them to replicasets, go to each
> replicaset just one time for all its buckets, take storage ref
> there, and execute your function. Or ref all the buckets individually
> but I don't know what to do if their number is big. Would increase
> request size, and referencing overhead too much.
>
> To attach some context to the buckets bat_call could accept pairs
> {bucket_id = {func, args}}. Or accept one function name and for
> each bucket have {bucket_id = {args}}. And bat_call will call your
> function with the given args on the storage. Maybe with such approach
> individual refs make more sense. Don't know.
>
> If you like these ideas, you can file tickets for them so as they
> wouldn't be lost and maybe eventually would be implemented.

Batch requests are quite often case and seems issue is already filed - 
https://github.com/tarantool/vshard/issues/176.

I'll think about "sca_callrw/sca_callro" if I will find some cases I 
file an issue. But currently it seems my

minds around scatter-operations are closely related with batch requests.



  reply	other threads:[~2021-02-25 12:44 UTC|newest]

Thread overview: 47+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-23  0:15 [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 01/11] error: introduce vshard.error.timeout() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:46     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 10/11] sched: introduce vshard.storage.sched module Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 21:50     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-04 21:02   ` Oleg Babin via Tarantool-patches
2021-03-05 22:06     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09  8:03       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 22:04     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:43       ` Oleg Babin via Tarantool-patches [this message]
2021-02-26 23:58         ` Vladislav Shpilevoy via Tarantool-patches
2021-03-01 10:58           ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 02/11] storage: add helper for local functions invocation Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 03/11] storage: cache bucket count Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:47     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 04/11] registry: module for circular deps resolution Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 05/11] util: introduce safe fiber_cond_wait() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:48     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 06/11] util: introduce fiber_is_self_canceled() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 07/11] storage: introduce bucket_generation_wait() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 08/11] storage: introduce bucket_are_all_rw() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:48     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 09/11] ref: introduce vshard.storage.ref module Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 21:49     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-03-04 21:22   ` Oleg Babin via Tarantool-patches
2021-03-05 22:06     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09  8:03       ` Oleg Babin via Tarantool-patches
2021-03-21 18:49   ` Vladislav Shpilevoy via Tarantool-patches
2021-03-12 23:13 ` [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches
2021-03-15  7:05   ` Oleg Babin via Tarantool-patches
2021-03-28 18:17 ` Vladislav Shpilevoy via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=f3760ddc-55d7-b6db-718b-d6969bb66085@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=olegrok@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=yaroslav.dynnikov@tarantool.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Tarantool development patches archive

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://lists.tarantool.org/tarantool-patches/0 tarantool-patches/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 tarantool-patches tarantool-patches/ https://lists.tarantool.org/tarantool-patches \
		tarantool-patches@dev.tarantool.org.
	public-inbox-index tarantool-patches

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git