[Tarantool-patches] [PATCH vshard 1/2] router: wrap is_async futures completely

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Sep 29 02:08:56 MSK 2021


Router used to override :result() method of netbox futures. It is
needed because user functions are called via vshard.storage.call()
which returns some metadata - it must be truncated before
returning the user's data.

It worked fine while netbox futures were implemented as tables.
But in the newest Tarantool most of netbox state machine code is
moved into C. The futures now are cdata.

They allow to add new members, but can't override their methods.
As a result, on the newest Tarantool is_async in
vshard.router.call() simply didn't work.

The patch wraps netbox futures completely with a Lua table, not
just overrides one method. Now it works the same on all Tarantool
versions starting from 1.10.

Closes #294
---
 test/lua_libs/storage_template.lua |  11 ++
 test/router/router.result          | 301 ++++++++++++++++++++++++++++-
 test/router/router.test.lua        | 120 +++++++++++-
 vshard/router/init.lua             | 148 ++++++++++----
 4 files changed, 542 insertions(+), 38 deletions(-)

diff --git a/test/lua_libs/storage_template.lua b/test/lua_libs/storage_template.lua
index 8df89f6..83ea710 100644
--- a/test/lua_libs/storage_template.lua
+++ b/test/lua_libs/storage_template.lua
@@ -101,6 +101,8 @@ function bootstrap_storage(engine)
         box.schema.role.grant('public', 'execute', 'function', 'raise_client_error')
         box.schema.func.create('do_push')
         box.schema.role.grant('public', 'execute', 'function', 'do_push')
+        box.schema.func.create('do_push_wait')
+        box.schema.role.grant('public', 'execute', 'function', 'do_push_wait')
         box.snapshot()
     end)
 end
@@ -152,6 +154,15 @@ function do_push(push, retval)
     return retval
 end
 
+is_push_wait_blocked = true
+function do_push_wait(push, retval_arr)
+    box.session.push(push)
+    while is_push_wait_blocked do
+        fiber.sleep(0.001)
+    end
+    return unpack(retval_arr)
+end
+
 --
 -- Wait a specified log message.
 -- Requirements:
diff --git a/test/router/router.result b/test/router/router.result
index 8a0e30d..8ddbe6d 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -613,7 +613,7 @@ messages
 - - 100
 ...
 --
--- gh-171: support is_async.
+-- gh-171, gh-294: support is_async.
 --
 future = vshard.router.callro(bucket_id, 'space_get', {'test', {1}}, {is_async = true})
 ---
@@ -632,6 +632,11 @@ future = vshard.router.callrw(bucket_id, 'raise_client_error', {}, {is_async = t
 res, err = future:wait_result()
 ---
 ...
+-- VShard wraps all errors.
+assert(type(err) == 'table')
+---
+- true
+...
 util.portable_error(err)
 ---
 - type: ClientError
@@ -690,6 +695,300 @@ future:wait_result()
 - [[1, 1]]
 ...
 --
+-- Error as a result of discard.
+--
+future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}},            \
+                              {is_async = true})
+---
+...
+future:discard()
+---
+...
+res, err = future:result()
+---
+...
+assert(not res and err.message:match('discarded') ~= nil)
+---
+- true
+...
+assert(type(err) == 'table')
+---
+- true
+...
+res, err = future:wait_result()
+---
+...
+assert(not res and err.message:match('discarded') ~= nil)
+---
+- true
+...
+assert(type(err) == 'table')
+---
+- true
+...
+--
+-- See how pairs behaves when the final result is not immediately ready.
+--
+future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}},            \
+                              {is_async = true})
+---
+...
+assert(not future:is_ready())
+---
+- true
+...
+-- Get the push successfully.
+func, iter, i = future:pairs()
+---
+...
+i, res = func(iter, i)
+---
+...
+assert(i == 1)
+---
+- true
+...
+assert(res == 10)
+---
+- true
+...
+-- Fail to get the final result during the timeout. It is supposed to test how
+-- the router knows which result is final and which is just a push. Even before
+-- the request ends.
+func, iter, i = future:pairs(0.001)
+---
+...
+i, res = func(iter, i)
+---
+...
+i, res = func(iter, i)
+---
+...
+assert(not i and res.code == box.error.TIMEOUT)
+---
+- true
+...
+assert(type(res) == 'table')
+---
+- true
+...
+res, err = future:wait_result(0.001)
+---
+...
+assert(not res and err.code == box.error.TIMEOUT)
+---
+- true
+...
+assert(type(err) == 'table')
+---
+- true
+...
+test_run:switch('storage_1_a')
+---
+- true
+...
+is_push_wait_blocked = false
+---
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+is_push_wait_blocked = false
+---
+...
+test_run:switch('router_1')
+---
+- true
+...
+func, iter, i = future:pairs()
+---
+...
+i, res = func(iter, i)
+---
+...
+assert(i == 1)
+---
+- true
+...
+assert(res == 10)
+---
+- true
+...
+i, res = func(iter, i)
+---
+...
+assert(i == 2)
+---
+- true
+...
+assert(res[1] == 20 and not res[2])
+---
+- true
+...
+assert(future:is_ready())
+---
+- true
+...
+i, res = func(iter, i)
+---
+...
+assert(not i)
+---
+- true
+...
+assert(not res)
+---
+- true
+...
+-- Repeat the same to ensure it returns the same.
+i, res = func(iter, 1)
+---
+...
+assert(i == 2)
+---
+- true
+...
+assert(res[1] == 20 and not res[2])
+---
+- true
+...
+-- Non-pairs functions return correctly unpacked successful results.
+res, err = future:wait_result()
+---
+...
+assert(res[1] == 20 and not res[2] and not err)
+---
+- true
+...
+res, err = future:result()
+---
+...
+assert(res[1] == 20 and not res[2] and not err)
+---
+- true
+...
+-- Return 2 nils - shouldn't be treated as an error.
+future = vshard.router.callrw(bucket_id, 'do_push_wait',                        \
+                              {10, {nil, nil}}, {is_async = true})
+---
+...
+res, err = future:wait_result()
+---
+...
+assert(res[1] == nil and res[2] == nil and not err)
+---
+- true
+...
+res, err = future:result()
+---
+...
+assert(res[1] == nil and res[2] == nil and not err)
+---
+- true
+...
+func, iter, i = future:pairs()
+---
+...
+i, res = func(iter, i)
+---
+...
+i, res = func(iter, i)
+---
+...
+assert(res[1] == nil and res[2] == nil and not err)
+---
+- true
+...
+-- Serialize and tostring.
+future
+---
+- []
+...
+future.key = 'value'
+---
+...
+future
+---
+- key: value
+...
+tostring(future)
+---
+- vshard.net.box.request
+...
+--
+-- The same, but the push function returns an error.
+--
+future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {nil, 'err'}},    \
+                              {is_async = true})
+---
+...
+func, iter, i = future:pairs()
+---
+...
+i, res = func(iter, i)
+---
+...
+assert(i == 1)
+---
+- true
+...
+assert(res == 10)
+---
+- true
+...
+i, res = func(iter, i)
+---
+...
+-- This test is for the sake of checking how the async request handles nil,err
+-- result.
+assert(i == 2)
+---
+- true
+...
+assert(not res[1] and res[2].message == 'err')
+---
+- true
+...
+assert(type(res[2]) == 'table')
+---
+- true
+...
+i, res = func(iter, i)
+---
+...
+assert(not i)
+---
+- true
+...
+assert(not res)
+---
+- true
+...
+-- Non-pairs getting of an error.
+res, err = future:wait_result()
+---
+...
+assert(not res and err.message == 'err')
+---
+- true
+...
+assert(type(err) == 'table')
+---
+- true
+...
+res, err = future:result()
+---
+...
+assert(not res and err.message == 'err')
+---
+- true
+...
+assert(type(err) == 'table')
+---
+- true
+...
+--
 -- Test errors from router call.
 --
 new_bid = vshard.consts.DEFAULT_BUCKET_COUNT + 1
diff --git a/test/router/router.test.lua b/test/router/router.test.lua
index 0017111..0f3854a 100644
--- a/test/router/router.test.lua
+++ b/test/router/router.test.lua
@@ -216,13 +216,15 @@ vshard.router.route(bucket_id):callrw('do_push', args, opts)
 messages
 
 --
--- gh-171: support is_async.
+-- gh-171, gh-294: support is_async.
 --
 future = vshard.router.callro(bucket_id, 'space_get', {'test', {1}}, {is_async = true})
 future:wait_result()
 future:is_ready()
 future = vshard.router.callrw(bucket_id, 'raise_client_error', {}, {is_async = true})
 res, err = future:wait_result()
+-- VShard wraps all errors.
+assert(type(err) == 'table')
 util.portable_error(err)
 future:is_ready()
 future = vshard.router.callrw(bucket_id, 'do_push', args, {is_async = true})
@@ -240,6 +242,122 @@ future:wait_result()
 future = vshard.router.route(bucket_id):callrw('space_get', {'test', {1}}, {is_async = true})
 future:wait_result()
 
+--
+-- Error as a result of discard.
+--
+future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}},            \
+                              {is_async = true})
+future:discard()
+res, err = future:result()
+assert(not res and err.message:match('discarded') ~= nil)
+assert(type(err) == 'table')
+res, err = future:wait_result()
+assert(not res and err.message:match('discarded') ~= nil)
+assert(type(err) == 'table')
+
+--
+-- See how pairs behaves when the final result is not immediately ready.
+--
+future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}},            \
+                              {is_async = true})
+assert(not future:is_ready())
+-- Get the push successfully.
+func, iter, i = future:pairs()
+i, res = func(iter, i)
+assert(i == 1)
+assert(res == 10)
+
+-- Fail to get the final result during the timeout. It is supposed to test how
+-- the router knows which result is final and which is just a push. Even before
+-- the request ends.
+func, iter, i = future:pairs(0.001)
+i, res = func(iter, i)
+i, res = func(iter, i)
+assert(not i and res.code == box.error.TIMEOUT)
+assert(type(res) == 'table')
+
+res, err = future:wait_result(0.001)
+assert(not res and err.code == box.error.TIMEOUT)
+assert(type(err) == 'table')
+
+test_run:switch('storage_1_a')
+is_push_wait_blocked = false
+test_run:switch('storage_2_a')
+is_push_wait_blocked = false
+test_run:switch('router_1')
+
+func, iter, i = future:pairs()
+i, res = func(iter, i)
+assert(i == 1)
+assert(res == 10)
+
+i, res = func(iter, i)
+assert(i == 2)
+assert(res[1] == 20 and not res[2])
+
+assert(future:is_ready())
+
+i, res = func(iter, i)
+assert(not i)
+assert(not res)
+
+-- Repeat the same to ensure it returns the same.
+i, res = func(iter, 1)
+assert(i == 2)
+assert(res[1] == 20 and not res[2])
+
+-- Non-pairs functions return correctly unpacked successful results.
+res, err = future:wait_result()
+assert(res[1] == 20 and not res[2] and not err)
+res, err = future:result()
+assert(res[1] == 20 and not res[2] and not err)
+
+-- Return 2 nils - shouldn't be treated as an error.
+future = vshard.router.callrw(bucket_id, 'do_push_wait',                        \
+                              {10, {nil, nil}}, {is_async = true})
+res, err = future:wait_result()
+assert(res[1] == nil and res[2] == nil and not err)
+res, err = future:result()
+assert(res[1] == nil and res[2] == nil and not err)
+func, iter, i = future:pairs()
+i, res = func(iter, i)
+i, res = func(iter, i)
+assert(res[1] == nil and res[2] == nil and not err)
+
+-- Serialize and tostring.
+future
+future.key = 'value'
+future
+tostring(future)
+
+--
+-- The same, but the push function returns an error.
+--
+future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {nil, 'err'}},    \
+                              {is_async = true})
+func, iter, i = future:pairs()
+i, res = func(iter, i)
+assert(i == 1)
+assert(res == 10)
+i, res = func(iter, i)
+-- This test is for the sake of checking how the async request handles nil,err
+-- result.
+assert(i == 2)
+assert(not res[1] and res[2].message == 'err')
+assert(type(res[2]) == 'table')
+i, res = func(iter, i)
+assert(not i)
+assert(not res)
+
+-- Non-pairs getting of an error.
+res, err = future:wait_result()
+assert(not res and err.message == 'err')
+assert(type(err) == 'table')
+
+res, err = future:result()
+assert(not res and err.message == 'err')
+assert(type(err) == 'table')
+
 --
 -- Test errors from router call.
 --
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 3d468f3..42de740 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -470,6 +470,114 @@ end
 -- API
 --------------------------------------------------------------------------------
 
+local function vshard_future_tostring(self)
+    return 'vshard.net.box.request'
+end
+
+local function vshard_future_serialize(self)
+    -- Drop the metatable. It is also copied and if returned as is leads to
+    -- recursive serialization.
+    local s = setmetatable(table.deepcopy(self), {})
+    s._base = nil
+    return s
+end
+
+local function vshard_future_is_ready(self)
+    return self._base:is_ready()
+end
+
+local function vshard_future_wrap_result(res)
+    local storage_ok, res, err = unpack(res)
+    if storage_ok then
+        if res == nil and err ~= nil then
+            return nil, lerror.make(err)
+        end
+        return setmetatable({res}, seq_serializer)
+    end
+    return nil, lerror.make(res)
+end
+
+local function vshard_future_result(self)
+    local res, err = self._base:result()
+    if res == nil then
+        return nil, lerror.make(err)
+    end
+    return vshard_future_wrap_result(res)
+end
+
+local function vshard_future_wait_result(self, timeout)
+    local res, err = self._base:wait_result(timeout)
+    if res == nil then
+        return nil, lerror.make(err)
+    end
+    return vshard_future_wrap_result(res)
+end
+
+local function vshard_future_discard(self)
+    return self._base:discard()
+end
+
+local function vshard_future_iter_next(iter, i)
+    local res, err
+    local base_next = iter.base_next
+    local base_req = iter.base_req
+    local base = iter.base
+    -- Need to distinguish the last response from the pushes. Because the former
+    -- has metadata returned by vshard.storage.call().
+    -- At the same time there is no way to check if the base pairs() did its
+    -- last iteration except calling its next() function again.
+    -- This, in turn, might lead to a block if the result is not ready yet.
+    i, res = base_next(base, i)
+    -- To avoid that there is a 2-phase check.
+    -- If the request isn't finished after first next(), it means the result is
+    -- not received. This is a push. Return as is.
+    -- If the request is finished, it is safe to call next() again to check if
+    -- it ended. It won't block.
+    local is_done = base_req:is_ready()
+
+    if not is_done then
+        -- Definitely a push. It would be finished if the final result was
+        -- received.
+        if i == nil then
+            return nil, lerror.make(res)
+        end
+        return i, res
+    end
+    if i == nil then
+        if res ~= nil then
+            return i, lerror.make(res)
+        end
+        return nil, nil
+    end
+    -- Will not block because the request is already finished.
+    if base_next(base, i) == nil then
+        res, err = vshard_future_wrap_result(res)
+        if res ~= nil then
+            return i, res
+        end
+        return i, {nil, lerror.make(err)}
+    end
+    return i, res
+end
+
+local function vshard_future_pairs(self, timeout)
+    local next_f, iter, i = self._base:pairs(timeout)
+    return vshard_future_iter_next,
+           {base = iter, base_req = self, base_next = next_f}, i
+end
+
+local vshard_future_mt = {
+    __tostring = vshard_future_tostring,
+    __serialize = vshard_future_serialize,
+    __index = {
+        is_ready = vshard_future_is_ready,
+        result = vshard_future_result,
+        wait_result = vshard_future_wait_result,
+        discard = vshard_future_discard,
+        pairs = vshard_future_pairs,
+    }
+}
+
 --
 -- Since 1.10 netbox supports flag 'is_async'. Given this flag, a
 -- request result is returned immediately in a form of a future
@@ -482,41 +590,9 @@ end
 -- So vshard.router.call should wrap a future object with its own
 -- unpacker of result.
 --
--- Unpack a result given from a future object from
--- vshard.storage.call. If future returns [status, result, ...]
--- this function returns [result]. Or a classical couple
--- nil, error.
---
-function future_storage_call_result(self)
-    local res, err = self:base_result()
-    if not res then
-        return nil, err
-    end
-    local storage_call_status, call_status, call_error = unpack(res)
-    if storage_call_status then
-        if call_status == nil and call_error ~= nil then
-            return call_status, call_error
-        else
-            return setmetatable({call_status}, seq_serializer)
-        end
-    end
-    return nil, call_status
-end
-
---
--- Given a netbox future object, redefine its 'result' method.
--- It is impossible to just create a new signle metatable per
--- the module as a copy of original future's one because it has
--- some upvalues related to the netbox connection.
---
-local function wrap_storage_call_future(future)
-    -- Base 'result' below is got from __index metatable under the
-    -- hood. But __index is used only when a table has no such a
-    -- member in itself. So via adding 'result' as a member to a
-    -- future object its __index.result can be redefined.
-    future.base_result = future.result
-    future.result = future_storage_call_result
-    return future
+local function vshard_future_new(future)
+    -- Use '_' as a prefix so as users could use all normal names.
+    return setmetatable({_base = future}, vshard_future_mt)
 end
 
 -- Perform shard operation
@@ -574,7 +650,7 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica,
                     -- values: true/false and func result. But
                     -- async returns future object. No true/false
                     -- nor func result. So return the first value.
-                    return wrap_storage_call_future(storage_call_status)
+                    return vshard_future_new(storage_call_status)
                 end
             end
             err = lerror.make(call_status)
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list