Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Oleg Babin <olegrok@tarantool.org>,
	tarantool-patches@dev.tarantool.org,
	yaroslav.dynnikov@tarantool.org
Subject: Re: [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw()
Date: Wed, 24 Feb 2021 23:04:28 +0100
Message-ID: <cfee9fa8-b370-78be-c5ec-2c6e3b459bc0@tarantool.org> (raw)
In-Reply-To: <837d2697-5165-23d6-72bf-f7533af10864@tarantool.org>

Thanks for the review!

On 24.02.2021 11:28, Oleg Babin via Tarantool-patches wrote:
> Thanks a lot for your patch! See 5 comments below.
> 
> 
> On 23.02.2021 03:15, Vladislav Shpilevoy wrote:
>> Closes #147
> 
> Will read-only map-reduce functions be done in the scope of separate issue/patch?
> 
> I know about #173 but seems we need to keep information about map_callro function.

Perhaps there will be a new ticket, yes. Until 173 is fixed, ro refs seem
pointless. The implementation depends on how exactly to fix 173.

At this moment I didn't even design it yet. I am thinking if I need to
account ro storage refs separated from rw refs in the scheduler or not.
Implementation heavily depends on this.

If I go for considering ro refs separated, it adds third group of operations
to the scheduler complicating it significantly, and a second type of refs to
the refs module obviously.

It would allow to send buckets and mark them as GARBAGE/SENT while keep ro
refs. But would complicate the garbage collector a bit as it would need to
consider storage ro refs.

On the other hand, it would heavily complicate the code in the scheduler.
Like really heavy, I suppose, but I can be wrong.

Also the win will be zeroed if we ever implement rebalancing of writable
buckets. Besides, the main reason I am more into unified type of refs is
that they are used solely for map-reduce. Which means they are taken on all
storages. This means if you have SENDING readable bucket on one storage,
you have RECEIVING non-readable bucket on another storage. Which makes
such ro refs pointless for full cluster scan. Simply won't be able to take
ro ref on the RECEIVING storage.

If we go for unified refs, it would allow to keep the scheduler relatively
simple, but requires urgent fix of 173. Otherwise you can't be sure your
data is consistent. Even now you can't be sure with normal requests, which
terrifies me and raises a huge question - why the fuck nobody cares? I
think I already had a couple of nightmares about 173.

Another issue - ro refs won't stop rebalancer from working and don't
even participate in the scheduler, if we fix 173 in the simplest way -
just invalidate all refs on the replica if a bucket starts moving. If
the rebalancer works actively, nearly all your ro map-reduces will return
errors during data migration because buckets will move constantly. No
throttling via the scheduler.

One way to go - switch all replica weights to defaults for the time of
data migration manually. So map_callro() will go to master nodes and
will participate in the scheduling.

Another way to go - don't care and fix 173 in the simplest way. Weights
anyway are used super rare AFAIK.

Third way to go - implement some smarter fix of 173. I have many ideas here.
But neither of them are quick.

>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>> index 97bcb0a..8abd77f 100644
>> --- a/vshard/router/init.lua
>> +++ b/vshard/router/init.lua
>> @@ -44,6 +44,11 @@ if not M then
>>           module_version = 0,
>>           -- Number of router which require collecting lua garbage.
>>           collect_lua_garbage_cnt = 0,
>> +
>> +        ----------------------- Map-Reduce -----------------------
>> +        -- Storage Ref ID. It must be unique for each ref request
>> +        -- and therefore is global and monotonically growing.
>> +        ref_id = 0,
> 
> Maybe 0ULL?

Wouldn't break anyway - doubles are precise until 2^53. But an integer
should be faster I hope.

Changed to 0ULL.

But still not sure. I asked Igor about this. If ULL/LL are more performant,
I will also use them in storage.ref and storage.sched where possible. They
have many counters.

>>   @@ -674,6 +679,177 @@ local function router_call(router, bucket_id, opts, ...)
>>                               ...)
>>   end
>>   +local router_map_callrw
>> +
>> +if util.version_is_at_least(1, 10, 0) then
>> +--
>> +-- Consistent Map-Reduce. The given function is called on all masters in the
>> +-- cluster with a guarantee that in case of success it was executed with all
>> +-- buckets being accessible for reads and writes.
>> +--
>> +-- Consistency in scope of map-reduce means all the data was accessible, and
>> +-- didn't move during map requests execution. To preserve the consistency there
>> +-- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce.
>> +--
>> +-- Refs are broadcast before Map stage to pin the buckets to their storages, and
>> +-- ensure they won't move until maps are done.
>> +--
>> +-- Map requests are broadcast in case all refs are done successfully. They
>> +-- execute the user function + delete the refs to enable rebalancing again.
>> +--
>> +-- On the storages there are additional means to ensure map-reduces don't block
>> +-- rebalancing forever and vice versa.
>> +--
>> +-- The function is not as slow as it may seem - it uses netbox's feature
>> +-- is_async to send refs and maps in parallel. So cost of the function is about
>> +-- 2 network exchanges to the most far storage in terms of time.
>> +--
>> +-- @param router Router instance to use.
>> +-- @param func Name of the function to call.
>> +-- @param args Function arguments passed in netbox style (as an array).
>> +-- @param opts Can only contain 'timeout' as a number of seconds. Note that the
>> +--     refs may end up being kept on the storages during this entire timeout if
>> +--     something goes wrong. For instance, network issues appear. This means
>> +--     better not use a value bigger than necessary. A stuck infinite ref can
>> +--     only be dropped by this router restart/reconnect or the storage restart.
>> +--
>> +-- @return In case of success - a map with replicaset UUID keys and values being
>> +--     what the function returned from the replicaset.
>> +--
>> +-- @return In case of an error - nil, error object, optional UUID of the
>> +--     replicaset where the error happened. UUID may be not present if it wasn't
>> +--     about concrete replicaset. For example, not all buckets were found even
>> +--     though all replicasets were scanned.
>> +--
>> +router_map_callrw = function(router, func, args, opts)
>> +    local replicasets = router.replicasets
> 
> It would be great to filter here replicasets with bucket_count = 0 and weight = 0.

Information on the router may be outdated. Even if bucket_count is 0,
it may still mean the discovery didn't get to there yet. Or the
discovery is simply disabled or dead due to a bug.

Weight 0 also does not say anything certain about buckets on the
storage. It can be that config on the router is outdated, or it is
outdated on the storage. Or someone simply does not set the weights
for router configs because they are not used here. Or the weights are
really 0 and set everywhere, but rebalancing is in progress - buckets
move from the storage slowly, and the scheduler allows to squeeze some
map-reduces in the meantime.

> In case if such "dummy" replicasets are disabled we get an error "connection refused".

I need more info here. What is 'disabled' replicaset? And why would
0 discovered buckets or 0 weight lead to the refused connection?

In case this is some cartridge specific shit, this is bad. As you
can see above, I can't ignore such replicasets. I need to send requests
to them anyway.

There is a workaround though - even if an error has occurred, continue
execution if even without the failed storages I still cover all the buckets.
Then having 'disabled' replicasets in the config would result in some
unnecessary faulty requests for each map call, but they would work.

Although I don't know how to 'return' such errors. I don't want to log them
on each request, and don't have a concept of a 'warning' object or something
like this. Weird option - in case of this uncertain success return the
result, error, uuid. So the function could return

- nil, err[, uuid] - fail;
- res              - success;
- res, err[, uuid] - success but with a suspicious issue;

>> +    local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN
>> +    local deadline = fiber_clock() + timeout
>> +    local err, err_uuid, res, ok, map
>> +    local futures = {}
>> +    local bucket_count = 0
>> +    local opts_async = {is_async = true}
>> +    local rs_count = 0
>> +    local rid = M.ref_id
>> +    M.ref_id = rid + 1
>> +    -- Nil checks are done explicitly here (== nil instead of 'not'), because
>> +    -- netbox requests return box.NULL instead of nils.
>> +
>> +    --
>> +    -- Ref stage: send.
>> +    --
>> +    for uuid, rs in pairs(replicasets) do
>> +        -- Netbox async requests work only with active connections. Need to wait
>> +        -- for the connection explicitly.
>> +        timeout, err = rs:wait_connected(timeout)
>> +        if timeout == nil then
>> +            err_uuid = uuid
>> +            goto fail
>> +        end
>> +        res, err = rs:callrw('vshard.storage._call',
>> +                              {'storage_ref', rid, timeout}, opts_async)
>> +        if res == nil then
>> +            err_uuid = uuid
>> +            goto fail
>> +        end
>> +        futures[uuid] = res
>> +        rs_count = rs_count + 1
>> +    end
>> +    map = table_new(0, rs_count)
>> +    --
>> +    -- Ref stage: collect.
>> +    --
>> +    for uuid, future in pairs(futures) do
>> +        res, err = future:wait_result(timeout)
>> +        -- Handle netbox error first.
>> +        if res == nil then
>> +            err_uuid = uuid
>> +            goto fail
>> +        end
>> +        -- Ref returns nil,err or bucket count.
>> +        res, err = unpack(res)
> 
> Seems `res, err = res[1], res[2]` could be a bit faster.

Indeed. Applied:

====================
@@ -767,7 +767,7 @@ router_map_callrw = function(router, func, args, opts)
             goto fail
         end
         -- Ref returns nil,err or bucket count.
-        res, err = unpack(res)
+        res, err = res[1], res[2]
         if res == nil then
             err_uuid = uuid
====================

On a not related note, I gave more thought to your idea with
doing a 'map-reduce' but not on the whole cluster. And for this we
could introduce 'sca_callrw/sca_callro'. Which mean 'scatter'.

They could send your requests to the given replicasets (or all by
default) and return whatever is back. Just a wrapper on top of a
couple of loops with is_async netbox calls. Not sure if need to
take storage refs for the calls. Their usage might be not related
to buckets really. Just to access the storages and their own local
data, not sharded data.

In case you want scattering for accessing a set of buckets, I
could add `bat_callrw/bat_callro`. Which means 'batch'. They would
take a set of bucket ids, match them to replicasets, go to each
replicaset just one time for all its buckets, take storage ref
there, and execute your function. Or ref all the buckets individually
but I don't know what to do if their number is big. Would increase
request size, and referencing overhead too much.

To attach some context to the buckets bat_call could accept pairs
{bucket_id = {func, args}}. Or accept one function name and for
each bucket have {bucket_id = {args}}. And bat_call will call your
function with the given args on the storage. Maybe with such approach
individual refs make more sense. Don't know.

If you like these ideas, you can file tickets for them so as they
wouldn't be lost and maybe eventually would be implemented.

  reply	other threads:[~2021-02-24 22:04 UTC|newest]

Thread overview: 47+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-23  0:15 [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 01/11] error: introduce vshard.error.timeout() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:46     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 10/11] sched: introduce vshard.storage.sched module Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 21:50     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-04 21:02   ` Oleg Babin via Tarantool-patches
2021-03-05 22:06     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09  8:03       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 22:04     ` Vladislav Shpilevoy via Tarantool-patches [this message]
2021-02-25 12:43       ` Oleg Babin via Tarantool-patches
2021-02-26 23:58         ` Vladislav Shpilevoy via Tarantool-patches
2021-03-01 10:58           ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 02/11] storage: add helper for local functions invocation Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 03/11] storage: cache bucket count Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:47     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 04/11] registry: module for circular deps resolution Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 05/11] util: introduce safe fiber_cond_wait() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:48     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 06/11] util: introduce fiber_is_self_canceled() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 07/11] storage: introduce bucket_generation_wait() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 08/11] storage: introduce bucket_are_all_rw() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:48     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 09/11] ref: introduce vshard.storage.ref module Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 21:49     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-03-04 21:22   ` Oleg Babin via Tarantool-patches
2021-03-05 22:06     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09  8:03       ` Oleg Babin via Tarantool-patches
2021-03-21 18:49   ` Vladislav Shpilevoy via Tarantool-patches
2021-03-12 23:13 ` [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches
2021-03-15  7:05   ` Oleg Babin via Tarantool-patches
2021-03-28 18:17 ` Vladislav Shpilevoy via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=cfee9fa8-b370-78be-c5ec-2c6e3b459bc0@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=olegrok@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=yaroslav.dynnikov@tarantool.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Tarantool development patches archive

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://lists.tarantool.org/tarantool-patches/0 tarantool-patches/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 tarantool-patches tarantool-patches/ https://lists.tarantool.org/tarantool-patches \
		tarantool-patches@dev.tarantool.org.
	public-inbox-index tarantool-patches

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git