Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: tarantool-patches@dev.tarantool.org, olegrok@tarantool.org,
	yaroslav.dynnikov@tarantool.org
Subject: [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw()
Date: Tue, 23 Feb 2021 01:15:39 +0100	[thread overview]
Message-ID: <f799cf4c753c0e271cfa9309aac4cd33a6a65aaf.1614039039.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1614039039.git.v.shpilevoy@tarantool.org>

Closes #147

@TarantoolBot document
Title: vshard.router.map_callrw()

`vshard.router.map_callrw()` implements consistent map-reduce over
the entire cluster. Consistency means all the data was accessible,
and didn't move during map requests execution.

It is useful when need to access potentially all the data in the
cluster or simply huge number of buckets scattered over the
instances and whose individual `vshard.router.call()` would take
too long.

`Map_callrw()` takes name of the function to call on the storages,
arguments in the format of array, and not required options map.
The only supported option for now is timeout which is applied to
the entire call. Not to individual calls for each storage.
```
vshard.router.map_callrw(func_name, args[, {timeout = <seconds>}])
```

The chosen function is called on the master node of each
replicaset with the given arguments.

In case of success `vshard.router.map_callrw()` returns a map with
replicaset UUIDs as keys and results of the user's function as
values, like this:
```
{uuid1 = {res1}, uuid2 = {res2}, ...}
```
If the function returned `nil` or `box.NULL` from one of the
storages, it won't be present in the result map.

In case of fail it returns nil, error object, and optional
replicaset UUID where the error happened. UUID may not be returned
if the error wasn't about a concrete replicaset.

For instance, the method fails if not all buckets were found even
if all replicasets were scanned successfully.

Handling the result looks like this:
```Lua
res, err, uuid = vshard.router.map_callrw(...)
if not res then
    -- Error.
    -- 'err' - error object. 'uuid' - optional UUID of replicaset
    -- where the error happened.
    ...
else
    -- Success.
    for uuid, value in pairs(res) do
        ...
    end
end
```

Map-Reduce in vshard works in 3 stages: Ref, Map, Reduce. Ref is
an internal stage which is supposed to ensure data consistency
during user's function execution on all nodes.

Reduce is not performed by vshard. It is what user's code does
with results of `map_callrw()`.

Consistency, as it is defined for map-reduce, is not compatible
with rebalancing. Because any bucket move would make the sender
and receiver nodes 'inconsistent' - it is not possible to call a
function on them which could simply access all the data without
doing `vshard.storage.bucket_ref()`.

This makes Ref stage very intricate as it must work together with
rebalancer to ensure neither of them block each other.

For this storage has a scheduler specifically for bucket moves and
storage refs which shares storage time between them fairly.

Definition of fairness depends on how long and frequent the moves
and refs are. This can be configured using storage options
`sched_move_quota` and `sched_ref_quota`. See more details about
them in the corresponding doc section.

The scheduler configuration may affect map-reduce requests if they
are used a lot during rebalancing.

Keep in mind that it is not a good idea to use too big timeouts
for `map_callrw()`. Because the router will try to block the
bucket moves for the given timeout on all storages. And in case
something will go wrong, the block will remain for the entire
timeout. This means, in particular, having the timeout longer
than, say, minutes is a super bad way to go unless it is for
tests only.

Also it is important to remember that `map_callrw()` does not
work on replicas. It works only on masters. This makes it unusable
if at least one replicaset has its master node down.
---
 test/router/map-reduce.result   | 636 ++++++++++++++++++++++++++++++++
 test/router/map-reduce.test.lua | 258 +++++++++++++
 test/router/router.result       |   9 +-
 test/upgrade/upgrade.result     |   5 +-
 vshard/replicaset.lua           |  34 ++
 vshard/router/init.lua          | 180 +++++++++
 vshard/storage/init.lua         |  47 +++
 7 files changed, 1164 insertions(+), 5 deletions(-)
 create mode 100644 test/router/map-reduce.result
 create mode 100644 test/router/map-reduce.test.lua

diff --git a/test/router/map-reduce.result b/test/router/map-reduce.result
new file mode 100644
index 0000000..1e8995a
--- /dev/null
+++ b/test/router/map-reduce.result
@@ -0,0 +1,636 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+ | ---
+ | ...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+ | ---
+ | ...
+test_run:create_cluster(REPLICASET_1, 'router')
+ | ---
+ | ...
+test_run:create_cluster(REPLICASET_2, 'router')
+ | ---
+ | ...
+util = require('util')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+ | ---
+ | ...
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
+ | ---
+ | ...
+util.push_rs_filters(test_run)
+ | ---
+ | ...
+_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
+ | ---
+ | ...
+_ = test_run:cmd("start server router_1")
+ | ---
+ | ...
+
+_ = test_run:switch("router_1")
+ | ---
+ | ...
+util = require('util')
+ | ---
+ | ...
+
+--
+-- gh-147: consistent map-reduce.
+--
+big_timeout = 1000000
+ | ---
+ | ...
+big_timeout_opts = {timeout = big_timeout}
+ | ---
+ | ...
+vshard.router.cfg(cfg)
+ | ---
+ | ...
+vshard.router.bootstrap(big_timeout_opts)
+ | ---
+ | - true
+ | ...
+-- Trivial basic sanity test. Multireturn is not supported, should be truncated.
+vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts)
+ | ---
+ | - <replicaset_2>:
+ |   - 1
+ |   <replicaset_1>:
+ |   - 1
+ | ...
+
+--
+-- Fail during connecting to storages. For the succeeded storages the router
+-- tries to send unref.
+--
+timeout = 0.001
+ | ---
+ | ...
+timeout_opts = {timeout = timeout}
+ | ---
+ | ...
+
+test_run:cmd('stop server storage_1_a')
+ | ---
+ | - true
+ | ...
+ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - Timeout exceeded
+ | ...
+-- Even if ref was sent successfully to storage_2_a, it was deleted before
+-- router returned an error.
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+lref = require('vshard.storage.ref')
+ | ---
+ | ...
+-- Wait because unref is sent asynchronously. Could arrive not immediately.
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+test_run:cmd('start server storage_1_a')
+ | ---
+ | - true
+ | ...
+-- Works again - router waited for connection being established.
+vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+ | ---
+ | - <replicaset_2>:
+ |   - 1
+ |   <replicaset_1>:
+ |   - 1
+ | ...
+
+--
+-- Do all the same but with another storage being stopped. The same test is done
+-- again because can't tell at which of the tests to where the router will go
+-- first.
+--
+test_run:cmd('stop server storage_2_a')
+ | ---
+ | - true
+ | ...
+ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - Timeout exceeded
+ | ...
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+lref = require('vshard.storage.ref')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+test_run:cmd('start server storage_2_a')
+ | ---
+ | - true
+ | ...
+vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+ | ---
+ | - <replicaset_2>:
+ |   - 1
+ |   <replicaset_1>:
+ |   - 1
+ | ...
+
+--
+-- Fail at ref stage handling. Unrefs are sent to cancel those refs which
+-- succeeded. To simulate a ref fail make the router think there is a moving
+-- bucket.
+--
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+lsched = require('vshard.storage.sched')
+ | ---
+ | ...
+big_timeout = 1000000
+ | ---
+ | ...
+lsched.move_start(big_timeout)
+ | ---
+ | - 1000000
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - Timeout exceeded
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+lsched = require('vshard.storage.sched')
+ | ---
+ | ...
+lref = require('vshard.storage.ref')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+--
+-- Do all the same with another storage being busy with a 'move'.
+--
+big_timeout = 1000000
+ | ---
+ | ...
+lsched.move_start(big_timeout)
+ | ---
+ | - 1000000
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+lref = require('vshard.storage.ref')
+ | ---
+ | ...
+lsched.move_end(1)
+ | ---
+ | ...
+assert(lref.count == 0)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - Timeout exceeded
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+lsched.move_end(1)
+ | ---
+ | ...
+assert(lref.count == 0)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+ | ---
+ | - <replicaset_2>:
+ |   - 1
+ |   <replicaset_1>:
+ |   - 1
+ | ...
+
+--
+-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs
+-- correctly. To simulate ref fail add a duplicate manually.
+--
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+box.schema.user.grant('storage', 'super')
+ | ---
+ | ...
+router_sid = nil
+ | ---
+ | ...
+function save_router_sid()                                                      \
+    router_sid = box.session.id()                                               \
+end
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+box.schema.user.grant('storage', 'super')
+ | ---
+ | ...
+router_sid = nil
+ | ---
+ | ...
+function save_router_sid()                                                      \
+    router_sid = box.session.id()                                               \
+end
+ | ---
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts)
+ | ---
+ | - []
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+lref.add(1, router_sid, big_timeout)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+vshard.router.internal.ref_id = 1
+ | ---
+ | ...
+ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - 'Can not add a storage ref: duplicate ref'
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+_ = lref.del(1, router_sid)
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+lref.add(1, router_sid, big_timeout)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+vshard.router.internal.ref_id = 1
+ | ---
+ | ...
+ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - 'Can not add a storage ref: duplicate ref'
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+_ = lref.del(1, router_sid)
+ | ---
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+--
+-- Fail if some buckets are not visible. Even if all the known replicasets were
+-- scanned. It means consistency violation.
+--
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+bucket_id = box.space._bucket.index.pk:min().id
+ | ---
+ | ...
+vshard.storage.bucket_force_drop(bucket_id)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - 1 buckets are not discovered
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+vshard.storage.bucket_force_create(bucket_id)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+bucket_id = box.space._bucket.index.pk:min().id
+ | ---
+ | ...
+vshard.storage.bucket_force_drop(bucket_id)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - 1 buckets are not discovered
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+vshard.storage.bucket_force_create(bucket_id)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+--
+-- Storage map unit tests.
+--
+
+-- Map fails not being able to use the ref.
+ok, err = vshard.storage._call('storage_map', 0, 'echo', {1})
+ | ---
+ | ...
+ok, err.message
+ | ---
+ | - null
+ | - 'Can not use a storage ref: no session'
+ | ...
+
+-- Map fails and clears the ref when the user function fails.
+vshard.storage._call('storage_ref', 0, big_timeout)
+ | ---
+ | - 1500
+ | ...
+assert(lref.count == 1)
+ | ---
+ | - true
+ | ...
+ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {})
+ | ---
+ | ...
+assert(lref.count == 0)
+ | ---
+ | - true
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - Unknown error
+ | ...
+
+-- Map fails gracefully when couldn't delete the ref.
+vshard.storage._call('storage_ref', 0, big_timeout)
+ | ---
+ | - 1500
+ | ...
+ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call',        \
+                               {'storage_unref', 0})
+ | ---
+ | ...
+assert(lref.count == 0)
+ | ---
+ | - true
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - 'Can not delete a storage ref: no ref'
+ | ...
+
+--
+-- Map fail is handled and the router tries to send unrefs.
+--
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+need_throw = true
+ | ---
+ | ...
+function map_throw()                                                            \
+    if need_throw then                                                          \
+        raise_client_error()                                                    \
+    end                                                                         \
+    return '+'                                                                  \
+end
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+need_throw = false
+ | ---
+ | ...
+function map_throw()                                                            \
+    if need_throw then                                                          \
+        raise_client_error()                                                    \
+    end                                                                         \
+    return '+'                                                                  \
+end
+ | ---
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts)
+ | ---
+ | ...
+ok, err.message
+ | ---
+ | - null
+ | - Unknown error
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+need_throw = false
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+need_throw = true
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('router_1')
+ | ---
+ | ...
+ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts)
+ | ---
+ | ...
+ok, err.message
+ | ---
+ | - null
+ | - Unknown error
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('default')
+ | ---
+ | ...
+_ = test_run:cmd("stop server router_1")
+ | ---
+ | ...
+_ = test_run:cmd("cleanup server router_1")
+ | ---
+ | ...
+test_run:drop_cluster(REPLICASET_1)
+ | ---
+ | ...
+test_run:drop_cluster(REPLICASET_2)
+ | ---
+ | ...
+_ = test_run:cmd('clear filter')
+ | ---
+ | ...
diff --git a/test/router/map-reduce.test.lua b/test/router/map-reduce.test.lua
new file mode 100644
index 0000000..3b63248
--- /dev/null
+++ b/test/router/map-reduce.test.lua
@@ -0,0 +1,258 @@
+test_run = require('test_run').new()
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+test_run:create_cluster(REPLICASET_1, 'router')
+test_run:create_cluster(REPLICASET_2, 'router')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
+util.push_rs_filters(test_run)
+_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
+_ = test_run:cmd("start server router_1")
+
+_ = test_run:switch("router_1")
+util = require('util')
+
+--
+-- gh-147: consistent map-reduce.
+--
+big_timeout = 1000000
+big_timeout_opts = {timeout = big_timeout}
+vshard.router.cfg(cfg)
+vshard.router.bootstrap(big_timeout_opts)
+-- Trivial basic sanity test. Multireturn is not supported, should be truncated.
+vshard.router.map_callrw('echo', {1, 2, 3}, big_timeout_opts)
+
+--
+-- Fail during connecting to storages. For the succeeded storages the router
+-- tries to send unref.
+--
+timeout = 0.001
+timeout_opts = {timeout = timeout}
+
+test_run:cmd('stop server storage_1_a')
+ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
+assert(not ok and err.message)
+-- Even if ref was sent successfully to storage_2_a, it was deleted before
+-- router returned an error.
+_ = test_run:switch('storage_2_a')
+lref = require('vshard.storage.ref')
+-- Wait because unref is sent asynchronously. Could arrive not immediately.
+test_run:wait_cond(function() return lref.count == 0 end)
+
+_ = test_run:switch('router_1')
+test_run:cmd('start server storage_1_a')
+-- Works again - router waited for connection being established.
+vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+
+--
+-- Do all the same but with another storage being stopped. The same test is done
+-- again because can't tell at which of the tests to where the router will go
+-- first.
+--
+test_run:cmd('stop server storage_2_a')
+ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
+assert(not ok and err.message)
+_ = test_run:switch('storage_1_a')
+lref = require('vshard.storage.ref')
+test_run:wait_cond(function() return lref.count == 0 end)
+
+_ = test_run:switch('router_1')
+test_run:cmd('start server storage_2_a')
+vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+
+--
+-- Fail at ref stage handling. Unrefs are sent to cancel those refs which
+-- succeeded. To simulate a ref fail make the router think there is a moving
+-- bucket.
+--
+_ = test_run:switch('storage_1_a')
+lsched = require('vshard.storage.sched')
+big_timeout = 1000000
+lsched.move_start(big_timeout)
+
+_ = test_run:switch('router_1')
+ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
+assert(not ok and err.message)
+
+_ = test_run:switch('storage_2_a')
+lsched = require('vshard.storage.sched')
+lref = require('vshard.storage.ref')
+test_run:wait_cond(function() return lref.count == 0 end)
+
+--
+-- Do all the same with another storage being busy with a 'move'.
+--
+big_timeout = 1000000
+lsched.move_start(big_timeout)
+
+_ = test_run:switch('storage_1_a')
+lref = require('vshard.storage.ref')
+lsched.move_end(1)
+assert(lref.count == 0)
+
+_ = test_run:switch('router_1')
+ok, err = vshard.router.map_callrw('echo', {1}, timeout_opts)
+assert(not ok and err.message)
+
+_ = test_run:switch('storage_1_a')
+test_run:wait_cond(function() return lref.count == 0 end)
+
+_ = test_run:switch('storage_2_a')
+lsched.move_end(1)
+assert(lref.count == 0)
+
+_ = test_run:switch('router_1')
+vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+
+--
+-- Ref can fail earlier than by a timeout. Router still should broadcast unrefs
+-- correctly. To simulate ref fail add a duplicate manually.
+--
+_ = test_run:switch('storage_1_a')
+box.schema.user.grant('storage', 'super')
+router_sid = nil
+function save_router_sid()                                                      \
+    router_sid = box.session.id()                                               \
+end
+
+_ = test_run:switch('storage_2_a')
+box.schema.user.grant('storage', 'super')
+router_sid = nil
+function save_router_sid()                                                      \
+    router_sid = box.session.id()                                               \
+end
+
+_ = test_run:switch('router_1')
+vshard.router.map_callrw('save_router_sid', {}, big_timeout_opts)
+
+_ = test_run:switch('storage_1_a')
+lref.add(1, router_sid, big_timeout)
+
+_ = test_run:switch('router_1')
+vshard.router.internal.ref_id = 1
+ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+assert(not ok and err.message)
+
+_ = test_run:switch('storage_1_a')
+_ = lref.del(1, router_sid)
+
+_ = test_run:switch('storage_2_a')
+test_run:wait_cond(function() return lref.count == 0 end)
+lref.add(1, router_sid, big_timeout)
+
+_ = test_run:switch('router_1')
+vshard.router.internal.ref_id = 1
+ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+assert(not ok and err.message)
+
+_ = test_run:switch('storage_2_a')
+_ = lref.del(1, router_sid)
+
+_ = test_run:switch('storage_1_a')
+test_run:wait_cond(function() return lref.count == 0 end)
+
+--
+-- Fail if some buckets are not visible. Even if all the known replicasets were
+-- scanned. It means consistency violation.
+--
+_ = test_run:switch('storage_1_a')
+bucket_id = box.space._bucket.index.pk:min().id
+vshard.storage.bucket_force_drop(bucket_id)
+
+_ = test_run:switch('router_1')
+ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+assert(not ok and err.message)
+
+_ = test_run:switch('storage_1_a')
+test_run:wait_cond(function() return lref.count == 0 end)
+vshard.storage.bucket_force_create(bucket_id)
+
+_ = test_run:switch('storage_2_a')
+test_run:wait_cond(function() return lref.count == 0 end)
+bucket_id = box.space._bucket.index.pk:min().id
+vshard.storage.bucket_force_drop(bucket_id)
+
+_ = test_run:switch('router_1')
+ok, err = vshard.router.map_callrw('echo', {1}, big_timeout_opts)
+assert(not ok and err.message)
+
+_ = test_run:switch('storage_2_a')
+test_run:wait_cond(function() return lref.count == 0 end)
+vshard.storage.bucket_force_create(bucket_id)
+
+_ = test_run:switch('storage_1_a')
+test_run:wait_cond(function() return lref.count == 0 end)
+
+--
+-- Storage map unit tests.
+--
+
+-- Map fails not being able to use the ref.
+ok, err = vshard.storage._call('storage_map', 0, 'echo', {1})
+ok, err.message
+
+-- Map fails and clears the ref when the user function fails.
+vshard.storage._call('storage_ref', 0, big_timeout)
+assert(lref.count == 1)
+ok, err = vshard.storage._call('storage_map', 0, 'raise_client_error', {})
+assert(lref.count == 0)
+assert(not ok and err.message)
+
+-- Map fails gracefully when couldn't delete the ref.
+vshard.storage._call('storage_ref', 0, big_timeout)
+ok, err = vshard.storage._call('storage_map', 0, 'vshard.storage._call',        \
+                               {'storage_unref', 0})
+assert(lref.count == 0)
+assert(not ok and err.message)
+
+--
+-- Map fail is handled and the router tries to send unrefs.
+--
+_ = test_run:switch('storage_1_a')
+need_throw = true
+function map_throw()                                                            \
+    if need_throw then                                                          \
+        raise_client_error()                                                    \
+    end                                                                         \
+    return '+'                                                                  \
+end
+
+_ = test_run:switch('storage_2_a')
+need_throw = false
+function map_throw()                                                            \
+    if need_throw then                                                          \
+        raise_client_error()                                                    \
+    end                                                                         \
+    return '+'                                                                  \
+end
+
+_ = test_run:switch('router_1')
+ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts)
+ok, err.message
+
+_ = test_run:switch('storage_1_a')
+need_throw = false
+test_run:wait_cond(function() return lref.count == 0 end)
+
+_ = test_run:switch('storage_2_a')
+need_throw = true
+test_run:wait_cond(function() return lref.count == 0 end)
+
+_ = test_run:switch('router_1')
+ok, err = vshard.router.map_callrw('raise_client_error', {}, big_timeout_opts)
+ok, err.message
+
+_ = test_run:switch('storage_1_a')
+test_run:wait_cond(function() return lref.count == 0 end)
+
+_ = test_run:switch('storage_2_a')
+test_run:wait_cond(function() return lref.count == 0 end)
+
+_ = test_run:switch('default')
+_ = test_run:cmd("stop server router_1")
+_ = test_run:cmd("cleanup server router_1")
+test_run:drop_cluster(REPLICASET_1)
+test_run:drop_cluster(REPLICASET_2)
+_ = test_run:cmd('clear filter')
\ No newline at end of file
diff --git a/test/router/router.result b/test/router/router.result
index 3c1d073..f9ee37c 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1163,14 +1163,15 @@ error_messages
 - - Use replicaset:callro(...) instead of replicaset.callro(...)
   - Use replicaset:connect_master(...) instead of replicaset.connect_master(...)
   - Use replicaset:callre(...) instead of replicaset.callre(...)
-  - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...)
   - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...)
-  - Use replicaset:callrw(...) instead of replicaset.callrw(...)
+  - Use replicaset:connect(...) instead of replicaset.connect(...)
+  - Use replicaset:wait_connected(...) instead of replicaset.wait_connected(...)
+  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
   - Use replicaset:callbro(...) instead of replicaset.callbro(...)
   - Use replicaset:connect_all(...) instead of replicaset.connect_all(...)
+  - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...)
   - Use replicaset:call(...) instead of replicaset.call(...)
-  - Use replicaset:connect(...) instead of replicaset.connect(...)
-  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
+  - Use replicaset:callrw(...) instead of replicaset.callrw(...)
   - Use replicaset:callbre(...) instead of replicaset.callbre(...)
 ...
 _, replica = next(replicaset.replicas)
diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result
index c2d54a3..833da3f 100644
--- a/test/upgrade/upgrade.result
+++ b/test/upgrade/upgrade.result
@@ -162,9 +162,12 @@ vshard.storage._call ~= nil
 vshard.storage._call('test_api', 1, 2, 3)
  | ---
  | - bucket_recv: true
+ |   storage_ref: true
  |   rebalancer_apply_routes: true
- |   test_api: true
+ |   storage_map: true
  |   rebalancer_request_state: true
+ |   test_api: true
+ |   storage_unref: true
  | - 1
  | - 2
  | - 3
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 7437e3b..56ea165 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -139,6 +139,39 @@ local function replicaset_connect_master(replicaset)
     return replicaset_connect_to_replica(replicaset, master)
 end
 
+--
+-- Wait until the master instance is connected. This is necessary at least for
+-- async requests because they fail immediately if the connection is not
+-- established.
+-- Returns the remaining timeout because is expected to be used to connect to
+-- many replicasets in a loop, where such return saves one clock get in the
+-- caller code and is just cleaner code.
+--
+local function replicaset_wait_connected(replicaset, timeout)
+    local deadline = fiber_clock() + timeout
+    local ok, res
+    while true do
+        local conn = replicaset_connect_master(replicaset)
+        if conn.state == 'active' then
+            return timeout
+        end
+        -- Netbox uses fiber_cond inside, which throws an irrelevant usage error
+        -- at negative timeout. Need to check the case manually.
+        if timeout < 0 then
+            return nil, lerror.timeout()
+        end
+        ok, res = pcall(conn.wait_connected, conn, timeout)
+        if not ok then
+            return nil, lerror.make(res)
+        end
+        if not res then
+            return nil, lerror.timeout()
+        end
+        timeout = deadline - fiber_clock()
+    end
+    assert(false)
+end
+
 --
 -- Create net.box connections to all replicas and master.
 --
@@ -483,6 +516,7 @@ local replicaset_mt = {
         connect_replica = replicaset_connect_to_replica;
         down_replica_priority = replicaset_down_replica_priority;
         up_replica_priority = replicaset_up_replica_priority;
+        wait_connected = replicaset_wait_connected,
         call = replicaset_master_call;
         callrw = replicaset_master_call;
         callro = replicaset_template_multicallro(false, false);
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 97bcb0a..8abd77f 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -44,6 +44,11 @@ if not M then
         module_version = 0,
         -- Number of router which require collecting lua garbage.
         collect_lua_garbage_cnt = 0,
+
+        ----------------------- Map-Reduce -----------------------
+        -- Storage Ref ID. It must be unique for each ref request
+        -- and therefore is global and monotonically growing.
+        ref_id = 0,
     }
 end
 
@@ -674,6 +679,177 @@ local function router_call(router, bucket_id, opts, ...)
                             ...)
 end
 
+local router_map_callrw
+
+if util.version_is_at_least(1, 10, 0) then
+--
+-- Consistent Map-Reduce. The given function is called on all masters in the
+-- cluster with a guarantee that in case of success it was executed with all
+-- buckets being accessible for reads and writes.
+--
+-- Consistency in scope of map-reduce means all the data was accessible, and
+-- didn't move during map requests execution. To preserve the consistency there
+-- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce.
+--
+-- Refs are broadcast before Map stage to pin the buckets to their storages, and
+-- ensure they won't move until maps are done.
+--
+-- Map requests are broadcast in case all refs are done successfully. They
+-- execute the user function + delete the refs to enable rebalancing again.
+--
+-- On the storages there are additional means to ensure map-reduces don't block
+-- rebalancing forever and vice versa.
+--
+-- The function is not as slow as it may seem - it uses netbox's feature
+-- is_async to send refs and maps in parallel. So cost of the function is about
+-- 2 network exchanges to the most far storage in terms of time.
+--
+-- @param router Router instance to use.
+-- @param func Name of the function to call.
+-- @param args Function arguments passed in netbox style (as an array).
+-- @param opts Can only contain 'timeout' as a number of seconds. Note that the
+--     refs may end up being kept on the storages during this entire timeout if
+--     something goes wrong. For instance, network issues appear. This means
+--     better not use a value bigger than necessary. A stuck infinite ref can
+--     only be dropped by this router restart/reconnect or the storage restart.
+--
+-- @return In case of success - a map with replicaset UUID keys and values being
+--     what the function returned from the replicaset.
+--
+-- @return In case of an error - nil, error object, optional UUID of the
+--     replicaset where the error happened. UUID may be not present if it wasn't
+--     about concrete replicaset. For example, not all buckets were found even
+--     though all replicasets were scanned.
+--
+router_map_callrw = function(router, func, args, opts)
+    local replicasets = router.replicasets
+    local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN
+    local deadline = fiber_clock() + timeout
+    local err, err_uuid, res, ok, map
+    local futures = {}
+    local bucket_count = 0
+    local opts_async = {is_async = true}
+    local rs_count = 0
+    local rid = M.ref_id
+    M.ref_id = rid + 1
+    -- Nil checks are done explicitly here (== nil instead of 'not'), because
+    -- netbox requests return box.NULL instead of nils.
+
+    --
+    -- Ref stage: send.
+    --
+    for uuid, rs in pairs(replicasets) do
+        -- Netbox async requests work only with active connections. Need to wait
+        -- for the connection explicitly.
+        timeout, err = rs:wait_connected(timeout)
+        if timeout == nil then
+            err_uuid = uuid
+            goto fail
+        end
+        res, err = rs:callrw('vshard.storage._call',
+                              {'storage_ref', rid, timeout}, opts_async)
+        if res == nil then
+            err_uuid = uuid
+            goto fail
+        end
+        futures[uuid] = res
+        rs_count = rs_count + 1
+    end
+    map = table_new(0, rs_count)
+    --
+    -- Ref stage: collect.
+    --
+    for uuid, future in pairs(futures) do
+        res, err = future:wait_result(timeout)
+        -- Handle netbox error first.
+        if res == nil then
+            err_uuid = uuid
+            goto fail
+        end
+        -- Ref returns nil,err or bucket count.
+        res, err = unpack(res)
+        if res == nil then
+            err_uuid = uuid
+            goto fail
+        end
+        bucket_count = bucket_count + res
+        timeout = deadline - fiber_clock()
+    end
+    -- All refs are done but not all buckets are covered. This is odd and can
+    -- mean many things. The most possible ones: 1) outdated configuration on
+    -- the router and it does not see another replicaset with more buckets,
+    -- 2) some buckets are simply lost or duplicated - could happen as a bug, or
+    -- if the user does a maintenance of some kind by creating/deleting buckets.
+    -- In both cases can't guarantee all the data would be covered by Map calls.
+    if bucket_count ~= router.total_bucket_count then
+        err = lerror.vshard(lerror.code.UNKNOWN_BUCKETS,
+                            router.total_bucket_count - bucket_count)
+        goto fail
+    end
+    --
+    -- Map stage: send.
+    --
+    args = {'storage_map', rid, func, args}
+    for uuid, rs in pairs(replicasets) do
+        res, err = rs:callrw('vshard.storage._call', args, opts_async)
+        if res == nil then
+            err_uuid = uuid
+            goto fail
+        end
+        futures[uuid] = res
+    end
+    --
+    -- Ref stage: collect.
+    --
+    for uuid, f in pairs(futures) do
+        res, err = f:wait_result(timeout)
+        if res == nil then
+            err_uuid = uuid
+            goto fail
+        end
+        -- Map returns true,res or nil,err.
+        ok, res = unpack(res)
+        if ok == nil then
+            err = res
+            err_uuid = uuid
+            goto fail
+        end
+        if res ~= nil then
+            -- Store as a table so in future it could be extended for
+            -- multireturn.
+            map[uuid] = {res}
+        end
+        timeout = deadline - fiber_clock()
+    end
+    do return map end
+
+::fail::
+    for uuid, f in pairs(futures) do
+        f:discard()
+        -- Best effort to remove the created refs before exiting. Can help if
+        -- the timeout was big and the error happened early.
+        f = replicasets[uuid]:callrw('vshard.storage._call',
+                                     {'storage_unref', rid}, opts_async)
+        if f ~= nil then
+            -- Don't care waiting for a result - no time for this. But it won't
+            -- affect the request sending if the connection is still alive.
+            f:discard()
+        end
+    end
+    err = lerror.make(err)
+    return nil, err, err_uuid
+end
+
+-- Version >= 1.10.
+else
+-- Version < 1.10.
+
+router_map_callrw = function()
+    error('Supported for Tarantool >= 1.10')
+end
+
+end
+
 --
 -- Get replicaset object by bucket identifier.
 -- @param bucket_id Bucket identifier.
@@ -1268,6 +1444,7 @@ local router_mt = {
         callrw = router_callrw;
         callre = router_callre;
         callbre = router_callbre;
+        map_callrw = router_map_callrw,
         route = router_route;
         routeall = router_routeall;
         bucket_id = router_bucket_id,
@@ -1365,6 +1542,9 @@ end
 if not rawget(_G, MODULE_INTERNALS) then
     rawset(_G, MODULE_INTERNALS, M)
 else
+    if not M.ref_id then
+        M.ref_id = 0
+    end
     for _, router in pairs(M.routers) do
         router_cfg(router, router.current_cfg, true)
         setmetatable(router, router_mt)
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 31f668f..0a14440 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -2415,6 +2415,50 @@ local function storage_call(bucket_id, mode, name, args)
     return ok, ret1, ret2, ret3
 end
 
+--
+-- Bind a new storage ref to the current box session. Is used as a part of
+-- Map-Reduce API.
+--
+local function storage_ref(rid, timeout)
+    local ok, err = lref.add(rid, box.session.id(), timeout)
+    if not ok then
+        return nil, err
+    end
+    return bucket_count()
+end
+
+--
+-- Drop a storage ref from the current box session. Is used as a part of
+-- Map-Reduce API.
+--
+local function storage_unref(rid)
+    return lref.del(rid, box.session.id())
+end
+
+--
+-- Execute a user's function under an infinite storage ref protecting from
+-- bucket moves. The ref should exist before, and is deleted after, regardless
+-- of the function result. Is used as a part of Map-Reduce API.
+--
+local function storage_map(rid, name, args)
+    local ok, err, res
+    local sid = box.session.id()
+    ok, err = lref.use(rid, sid)
+    if not ok then
+        return nil, err
+    end
+    ok, res = local_call(name, args)
+    if not ok then
+        lref.del(rid, sid)
+        return nil, lerror.make(res)
+    end
+    ok, err = lref.del(rid, sid)
+    if not ok then
+        return nil, err
+    end
+    return true, res
+end
+
 local service_call_api
 
 local function service_call_test_api(...)
@@ -2425,6 +2469,9 @@ service_call_api = setmetatable({
     bucket_recv = bucket_recv,
     rebalancer_apply_routes = rebalancer_apply_routes,
     rebalancer_request_state = rebalancer_request_state,
+    storage_ref = storage_ref,
+    storage_unref = storage_unref,
+    storage_map = storage_map,
     test_api = service_call_test_api,
 }, {__serialize = function(api)
     local res = {}
-- 
2.24.3 (Apple Git-128)


  parent reply	other threads:[~2021-02-23  0:17 UTC|newest]

Thread overview: 47+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-23  0:15 [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 01/11] error: introduce vshard.error.timeout() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:46     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 10/11] sched: introduce vshard.storage.sched module Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 21:50     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-04 21:02   ` Oleg Babin via Tarantool-patches
2021-03-05 22:06     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09  8:03       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` Vladislav Shpilevoy via Tarantool-patches [this message]
2021-02-24 10:28   ` [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw() Oleg Babin via Tarantool-patches
2021-02-24 22:04     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:43       ` Oleg Babin via Tarantool-patches
2021-02-26 23:58         ` Vladislav Shpilevoy via Tarantool-patches
2021-03-01 10:58           ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 02/11] storage: add helper for local functions invocation Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 03/11] storage: cache bucket count Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:47     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 04/11] registry: module for circular deps resolution Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 05/11] util: introduce safe fiber_cond_wait() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:48     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 06/11] util: introduce fiber_is_self_canceled() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 07/11] storage: introduce bucket_generation_wait() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 08/11] storage: introduce bucket_are_all_rw() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:48     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 09/11] ref: introduce vshard.storage.ref module Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 21:49     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-03-04 21:22   ` Oleg Babin via Tarantool-patches
2021-03-05 22:06     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09  8:03       ` Oleg Babin via Tarantool-patches
2021-03-21 18:49   ` Vladislav Shpilevoy via Tarantool-patches
2021-03-12 23:13 ` [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches
2021-03-15  7:05   ` Oleg Babin via Tarantool-patches
2021-03-28 18:17 ` Vladislav Shpilevoy via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=f799cf4c753c0e271cfa9309aac4cd33a6a65aaf.1614039039.git.v.shpilevoy@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=olegrok@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=yaroslav.dynnikov@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw()' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox