[Tarantool-patches] [PATCH vshard 1/2] router: wrap is_async futures completely
Oleg Babin
olegrok at tarantool.org
Wed Sep 29 09:18:44 MSK 2021
Thanks for your patch! I left three comments below.
On 29.09.2021 02:08, Vladislav Shpilevoy wrote:
> Router used to override :result() method of netbox futures. It is
> needed because user functions are called via vshard.storage.call()
> which returns some metadata - it must be truncated before
> returning the user's data.
>
> It worked fine while netbox futures were implemented as tables.
> But in the newest Tarantool most of netbox state machine code is
> moved into C. The futures now are cdata.
AFAIK userdata (not cdata) if it makes sense.
> They allow to add new members, but can't override their methods.
> As a result, on the newest Tarantool is_async in
> vshard.router.call() simply didn't work.
>
> The patch wraps netbox futures completely with a Lua table, not
> just overrides one method. Now it works the same on all Tarantool
> versions starting from 1.10.
>
> Closes #294
> ---
> test/lua_libs/storage_template.lua | 11 ++
> test/router/router.result | 301 ++++++++++++++++++++++++++++-
> test/router/router.test.lua | 120 +++++++++++-
> vshard/router/init.lua | 148 ++++++++++----
> 4 files changed, 542 insertions(+), 38 deletions(-)
>
> diff --git a/test/lua_libs/storage_template.lua b/test/lua_libs/storage_template.lua
> index 8df89f6..83ea710 100644
> --- a/test/lua_libs/storage_template.lua
...
> +
> --
> -- Test errors from router call.
> --
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 3d468f3..42de740 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -470,6 +470,114 @@ end
> -- API
> --------------------------------------------------------------------------------
>
I thing following changes could be moved into sepearate file.
> +local function vshard_future_tostring(self)
> + return 'vshard.net.box.request'
> +end
> +
> +local function vshard_future_serialize(self)
> + -- Drop the metatable. It is also copied and if returned as is leads to
> + -- recursive serialization.
> + local s = setmetatable(table.deepcopy(self), {})
> + s._base = nil
> + return s
> +end
> +
> +local function vshard_future_is_ready(self)
> + return self._base:is_ready()
> +end
> +
> +local function vshard_future_wrap_result(res)
> + local storage_ok, res, err = unpack(res)
Unpack could be replaced with ... = res[1], res[2], res[3].
Should be a bit faster since unpack can't be jitted.
> + if storage_ok then
> + if res == nil and err ~= nil then
> + return nil, lerror.make(err)
> + end
> + return setmetatable({res}, seq_serializer)
> + end
> + return nil, lerror.make(res)
> +end
> +
> +local function vshard_future_result(self)
> + local res, err = self._base:result()
> + if res == nil then
> + return nil, lerror.make(err)
> + end
> + return vshard_future_wrap_result(res)
> +end
> +
> +local function vshard_future_wait_result(self, timeout)
> + local res, err = self._base:wait_result(timeout)
> + if res == nil then
> + return nil, lerror.make(err)
> + end
> + return vshard_future_wrap_result(res)
> +end
> +
> +local function vshard_future_discard(self)
> + return self._base:discard()
> +end
> +
> +local function vshard_future_iter_next(iter, i)
> + local res, err
> + local base_next = iter.base_next
> + local base_req = iter.base_req
> + local base = iter.base
> + -- Need to distinguish the last response from the pushes. Because the former
> + -- has metadata returned by vshard.storage.call().
> + -- At the same time there is no way to check if the base pairs() did its
> + -- last iteration except calling its next() function again.
> + -- This, in turn, might lead to a block if the result is not ready yet.
> + i, res = base_next(base, i)
> + -- To avoid that there is a 2-phase check.
> + -- If the request isn't finished after first next(), it means the result is
> + -- not received. This is a push. Return as is.
> + -- If the request is finished, it is safe to call next() again to check if
> + -- it ended. It won't block.
> + local is_done = base_req:is_ready()
> +
> + if not is_done then
> + -- Definitely a push. It would be finished if the final result was
> + -- received.
> + if i == nil then
> + return nil, lerror.make(res)
> + end
> + return i, res
> + end
> + if i == nil then
> + if res ~= nil then
> + return i, lerror.make(res)
> + end
> + return nil, nil
> + end
> + -- Will not block because the request is already finished.
> + if base_next(base, i) == nil then
> + res, err = vshard_future_wrap_result(res)
> + if res ~= nil then
> + return i, res
> + end
> + return i, {nil, lerror.make(err)}
> + end
> + return i, res
> +end
> +
> +local function vshard_future_pairs(self, timeout)
> + local next_f, iter, i = self._base:pairs(timeout)
> + return vshard_future_iter_next,
> + {base = iter, base_req = self, base_next = next_f}, i
> +end
> +
> +local vshard_future_mt = {
> + __tostring = vshard_future_tostring,
> + __serialize = vshard_future_serialize,
> + __index = {
> + is_ready = vshard_future_is_ready,
> + result = vshard_future_result,
> + wait_result = vshard_future_wait_result,
> + discard = vshard_future_discard,
> + pairs = vshard_future_pairs,
> + }
> +}
> +
> --
> -- Since 1.10 netbox supports flag 'is_async'. Given this flag, a
> -- request result is returned immediately in a form of a future
> @@ -482,41 +590,9 @@ end
> -- So vshard.router.call should wrap a future object with its own
> -- unpacker of result.
> --
> --- Unpack a result given from a future object from
> --- vshard.storage.call. If future returns [status, result, ...]
> --- this function returns [result]. Or a classical couple
> --- nil, error.
> ---
> -function future_storage_call_result(self)
> - local res, err = self:base_result()
> - if not res then
> - return nil, err
> - end
> - local storage_call_status, call_status, call_error = unpack(res)
> - if storage_call_status then
> - if call_status == nil and call_error ~= nil then
> - return call_status, call_error
> - else
> - return setmetatable({call_status}, seq_serializer)
> - end
> - end
> - return nil, call_status
> -end
> -
> ---
> --- Given a netbox future object, redefine its 'result' method.
> --- It is impossible to just create a new signle metatable per
> --- the module as a copy of original future's one because it has
> --- some upvalues related to the netbox connection.
> ---
> -local function wrap_storage_call_future(future)
> - -- Base 'result' below is got from __index metatable under the
> - -- hood. But __index is used only when a table has no such a
> - -- member in itself. So via adding 'result' as a member to a
> - -- future object its __index.result can be redefined.
> - future.base_result = future.result
> - future.result = future_storage_call_result
> - return future
> +local function vshard_future_new(future)
> + -- Use '_' as a prefix so as users could use all normal names.
> + return setmetatable({_base = future}, vshard_future_mt)
> end
>
> -- Perform shard operation
> @@ -574,7 +650,7 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica,
> -- values: true/false and func result. But
> -- async returns future object. No true/false
> -- nor func result. So return the first value.
> - return wrap_storage_call_future(storage_call_status)
> + return vshard_future_new(storage_call_status)
> end
> end
> err = lerror.make(call_status)
More information about the Tarantool-patches
mailing list