[Tarantool-patches] [PATCH vshard 1/2] router: wrap is_async futures completely

Oleg Babin olegrok at tarantool.org
Wed Sep 29 09:18:44 MSK 2021


Thanks for your patch! I left three comments below.

On 29.09.2021 02:08, Vladislav Shpilevoy wrote:
> Router used to override :result() method of netbox futures. It is
> needed because user functions are called via vshard.storage.call()
> which returns some metadata - it must be truncated before
> returning the user's data.
>
> It worked fine while netbox futures were implemented as tables.
> But in the newest Tarantool most of netbox state machine code is
> moved into C. The futures now are cdata.
AFAIK userdata (not cdata) if it makes sense.
> They allow to add new members, but can't override their methods.
> As a result, on the newest Tarantool is_async in
> vshard.router.call() simply didn't work.
>
> The patch wraps netbox futures completely with a Lua table, not
> just overrides one method. Now it works the same on all Tarantool
> versions starting from 1.10.
>
> Closes #294
> ---
>   test/lua_libs/storage_template.lua |  11 ++
>   test/router/router.result          | 301 ++++++++++++++++++++++++++++-
>   test/router/router.test.lua        | 120 +++++++++++-
>   vshard/router/init.lua             | 148 ++++++++++----
>   4 files changed, 542 insertions(+), 38 deletions(-)
>
> diff --git a/test/lua_libs/storage_template.lua b/test/lua_libs/storage_template.lua
> index 8df89f6..83ea710 100644
> --- a/test/lua_libs/storage_template.lua
...
> +
>   --
>   -- Test errors from router call.
>   --
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 3d468f3..42de740 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -470,6 +470,114 @@ end
>   -- API
>   --------------------------------------------------------------------------------
>   

I thing following changes could be moved into sepearate file.

> +local function vshard_future_tostring(self)
> +    return 'vshard.net.box.request'
> +end
> +
> +local function vshard_future_serialize(self)
> +    -- Drop the metatable. It is also copied and if returned as is leads to
> +    -- recursive serialization.
> +    local s = setmetatable(table.deepcopy(self), {})
> +    s._base = nil
> +    return s
> +end
> +
> +local function vshard_future_is_ready(self)
> +    return self._base:is_ready()
> +end
> +
> +local function vshard_future_wrap_result(res)
> +    local storage_ok, res, err = unpack(res)

Unpack could be replaced with ... = res[1], res[2], res[3].

Should be a bit faster since unpack can't be jitted.

> +    if storage_ok then
> +        if res == nil and err ~= nil then
> +            return nil, lerror.make(err)
> +        end
> +        return setmetatable({res}, seq_serializer)
> +    end
> +    return nil, lerror.make(res)
> +end
> +
> +local function vshard_future_result(self)
> +    local res, err = self._base:result()
> +    if res == nil then
> +        return nil, lerror.make(err)
> +    end
> +    return vshard_future_wrap_result(res)
> +end
> +
> +local function vshard_future_wait_result(self, timeout)
> +    local res, err = self._base:wait_result(timeout)
> +    if res == nil then
> +        return nil, lerror.make(err)
> +    end
> +    return vshard_future_wrap_result(res)
> +end
> +
> +local function vshard_future_discard(self)
> +    return self._base:discard()
> +end
> +
> +local function vshard_future_iter_next(iter, i)
> +    local res, err
> +    local base_next = iter.base_next
> +    local base_req = iter.base_req
> +    local base = iter.base
> +    -- Need to distinguish the last response from the pushes. Because the former
> +    -- has metadata returned by vshard.storage.call().
> +    -- At the same time there is no way to check if the base pairs() did its
> +    -- last iteration except calling its next() function again.
> +    -- This, in turn, might lead to a block if the result is not ready yet.
> +    i, res = base_next(base, i)
> +    -- To avoid that there is a 2-phase check.
> +    -- If the request isn't finished after first next(), it means the result is
> +    -- not received. This is a push. Return as is.
> +    -- If the request is finished, it is safe to call next() again to check if
> +    -- it ended. It won't block.
> +    local is_done = base_req:is_ready()
> +
> +    if not is_done then
> +        -- Definitely a push. It would be finished if the final result was
> +        -- received.
> +        if i == nil then
> +            return nil, lerror.make(res)
> +        end
> +        return i, res
> +    end
> +    if i == nil then
> +        if res ~= nil then
> +            return i, lerror.make(res)
> +        end
> +        return nil, nil
> +    end
> +    -- Will not block because the request is already finished.
> +    if base_next(base, i) == nil then
> +        res, err = vshard_future_wrap_result(res)
> +        if res ~= nil then
> +            return i, res
> +        end
> +        return i, {nil, lerror.make(err)}
> +    end
> +    return i, res
> +end
> +
> +local function vshard_future_pairs(self, timeout)
> +    local next_f, iter, i = self._base:pairs(timeout)
> +    return vshard_future_iter_next,
> +           {base = iter, base_req = self, base_next = next_f}, i
> +end
> +
> +local vshard_future_mt = {
> +    __tostring = vshard_future_tostring,
> +    __serialize = vshard_future_serialize,
> +    __index = {
> +        is_ready = vshard_future_is_ready,
> +        result = vshard_future_result,
> +        wait_result = vshard_future_wait_result,
> +        discard = vshard_future_discard,
> +        pairs = vshard_future_pairs,
> +    }
> +}
> +
>   --
>   -- Since 1.10 netbox supports flag 'is_async'. Given this flag, a
>   -- request result is returned immediately in a form of a future
> @@ -482,41 +590,9 @@ end
>   -- So vshard.router.call should wrap a future object with its own
>   -- unpacker of result.
>   --
> --- Unpack a result given from a future object from
> --- vshard.storage.call. If future returns [status, result, ...]
> --- this function returns [result]. Or a classical couple
> --- nil, error.
> ---
> -function future_storage_call_result(self)
> -    local res, err = self:base_result()
> -    if not res then
> -        return nil, err
> -    end
> -    local storage_call_status, call_status, call_error = unpack(res)
> -    if storage_call_status then
> -        if call_status == nil and call_error ~= nil then
> -            return call_status, call_error
> -        else
> -            return setmetatable({call_status}, seq_serializer)
> -        end
> -    end
> -    return nil, call_status
> -end
> -
> ---
> --- Given a netbox future object, redefine its 'result' method.
> --- It is impossible to just create a new signle metatable per
> --- the module as a copy of original future's one because it has
> --- some upvalues related to the netbox connection.
> ---
> -local function wrap_storage_call_future(future)
> -    -- Base 'result' below is got from __index metatable under the
> -    -- hood. But __index is used only when a table has no such a
> -    -- member in itself. So via adding 'result' as a member to a
> -    -- future object its __index.result can be redefined.
> -    future.base_result = future.result
> -    future.result = future_storage_call_result
> -    return future
> +local function vshard_future_new(future)
> +    -- Use '_' as a prefix so as users could use all normal names.
> +    return setmetatable({_base = future}, vshard_future_mt)
>   end
>   
>   -- Perform shard operation
> @@ -574,7 +650,7 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica,
>                       -- values: true/false and func result. But
>                       -- async returns future object. No true/false
>                       -- nor func result. So return the first value.
> -                    return wrap_storage_call_future(storage_call_status)
> +                    return vshard_future_new(storage_call_status)
>                   end
>               end
>               err = lerror.make(call_status)


More information about the Tarantool-patches mailing list