From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 47F2D6FC87; Wed, 29 Sep 2021 02:09:32 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 47F2D6FC87 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1632870572; bh=tmKxcZXvYawWHg1bDcS2Uk+bRB1D6EJalPa69XXJGuA=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=jqqV3SZsC9tQdGnk2UJ8OJHAooAjlzSPuGdRKcVxvWebrLYaJymBnqlS6qvW2aPbl pY1YP9UWAJtwcBszJMRZBVndyIGsyNJjdO+ip/Mop1umfva7GI67FfaTfwfeaZgG+o DEmNO8o8zcS5gKLZMx+gZ3AjxSeOd+G31OlFVKN8= Received: from smtpng1.i.mail.ru (smtpng1.i.mail.ru [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 39C9D6FC87 for ; Wed, 29 Sep 2021 02:09:00 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 39C9D6FC87 Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1mVMDb-0004Sd-Ao; Wed, 29 Sep 2021 02:08:59 +0300 To: tarantool-patches@dev.tarantool.org, olegrok@tarantool.org Date: Wed, 29 Sep 2021 01:08:56 +0200 Message-Id: X-Mailer: git-send-email 2.24.3 (Apple Git-128) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: 78E4E2B564C1792B X-77F55803: 4F1203BC0FB41BD96A58C36AA2E99649BF631F26B0465AFD0E15652C7D51B98D182A05F538085040CEFFB5F6F3F7CD1586657B2673416E6DF5D5EAD61F06E5531C48EEE49A919868 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7DB84ED444C624799EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006374C960BD2D03BF0BDEA1F7E6F0F101C6723150C8DA25C47586E58E00D9D99D84E1BDDB23E98D2D38BBCA57AF85F7723F2581D423ADAB09FCA915557BB0D483BE1CC7F00164DA146DAFE8445B8C89999728AA50765F7900637F6B57BC7E64490618DEB871D839B7333395957E7521B51C2DFABB839C843B9C08941B15DA834481F8AA50765F7900637F6B57BC7E6449061A352F6E88A58FB86F5D81C698A659EA7E827F84554CEF5019E625A9149C048EE9ECD01F8117BC8BEE2021AF6380DFAD18AA50765F790063735872C767BF85DA227C277FBC8AE2E8BDCE939D40DBB93CA75ECD9A6C639B01B4E70A05D1297E1BBCB5012B2E24CD356 X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8186998911F362727C4C7A0BC55FA0FE5FC12140A200D236465FE0CFE7943F96FA906ED8426BA52F86FB1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDBF80095D1E77F4578DC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34EA882B598A2098115BC6BB4FA1D6DEB108119ED5157FF4310C963ED34AEEBBC600075C0EF4EEAD991D7E09C32AA3244C6BCE2693E8F64BABFB831397627A8585C3B3ADDA61883BB5729B2BEF169E0186 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojWaDhU1ub98akL+7pU+r33Q== X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5D27FD5CEF7E1E9D3949A24B1C38DDA8563841015FED1DE5223CC9A89AB576DD93FB559BB5D741EB963CF37A108A312F5C27E8A8C3839CE0E267EA787935ED9F1B X-Mras: Ok Subject: [Tarantool-patches] [PATCH vshard 1/2] router: wrap is_async futures completely X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladislav Shpilevoy via Tarantool-patches Reply-To: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Router used to override :result() method of netbox futures. It is needed because user functions are called via vshard.storage.call() which returns some metadata - it must be truncated before returning the user's data. It worked fine while netbox futures were implemented as tables. But in the newest Tarantool most of netbox state machine code is moved into C. The futures now are cdata. They allow to add new members, but can't override their methods. As a result, on the newest Tarantool is_async in vshard.router.call() simply didn't work. The patch wraps netbox futures completely with a Lua table, not just overrides one method. Now it works the same on all Tarantool versions starting from 1.10. Closes #294 --- test/lua_libs/storage_template.lua | 11 ++ test/router/router.result | 301 ++++++++++++++++++++++++++++- test/router/router.test.lua | 120 +++++++++++- vshard/router/init.lua | 148 ++++++++++---- 4 files changed, 542 insertions(+), 38 deletions(-) diff --git a/test/lua_libs/storage_template.lua b/test/lua_libs/storage_template.lua index 8df89f6..83ea710 100644 --- a/test/lua_libs/storage_template.lua +++ b/test/lua_libs/storage_template.lua @@ -101,6 +101,8 @@ function bootstrap_storage(engine) box.schema.role.grant('public', 'execute', 'function', 'raise_client_error') box.schema.func.create('do_push') box.schema.role.grant('public', 'execute', 'function', 'do_push') + box.schema.func.create('do_push_wait') + box.schema.role.grant('public', 'execute', 'function', 'do_push_wait') box.snapshot() end) end @@ -152,6 +154,15 @@ function do_push(push, retval) return retval end +is_push_wait_blocked = true +function do_push_wait(push, retval_arr) + box.session.push(push) + while is_push_wait_blocked do + fiber.sleep(0.001) + end + return unpack(retval_arr) +end + -- -- Wait a specified log message. -- Requirements: diff --git a/test/router/router.result b/test/router/router.result index 8a0e30d..8ddbe6d 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -613,7 +613,7 @@ messages - - 100 ... -- --- gh-171: support is_async. +-- gh-171, gh-294: support is_async. -- future = vshard.router.callro(bucket_id, 'space_get', {'test', {1}}, {is_async = true}) --- @@ -632,6 +632,11 @@ future = vshard.router.callrw(bucket_id, 'raise_client_error', {}, {is_async = t res, err = future:wait_result() --- ... +-- VShard wraps all errors. +assert(type(err) == 'table') +--- +- true +... util.portable_error(err) --- - type: ClientError @@ -690,6 +695,300 @@ future:wait_result() - [[1, 1]] ... -- +-- Error as a result of discard. +-- +future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}}, \ + {is_async = true}) +--- +... +future:discard() +--- +... +res, err = future:result() +--- +... +assert(not res and err.message:match('discarded') ~= nil) +--- +- true +... +assert(type(err) == 'table') +--- +- true +... +res, err = future:wait_result() +--- +... +assert(not res and err.message:match('discarded') ~= nil) +--- +- true +... +assert(type(err) == 'table') +--- +- true +... +-- +-- See how pairs behaves when the final result is not immediately ready. +-- +future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}}, \ + {is_async = true}) +--- +... +assert(not future:is_ready()) +--- +- true +... +-- Get the push successfully. +func, iter, i = future:pairs() +--- +... +i, res = func(iter, i) +--- +... +assert(i == 1) +--- +- true +... +assert(res == 10) +--- +- true +... +-- Fail to get the final result during the timeout. It is supposed to test how +-- the router knows which result is final and which is just a push. Even before +-- the request ends. +func, iter, i = future:pairs(0.001) +--- +... +i, res = func(iter, i) +--- +... +i, res = func(iter, i) +--- +... +assert(not i and res.code == box.error.TIMEOUT) +--- +- true +... +assert(type(res) == 'table') +--- +- true +... +res, err = future:wait_result(0.001) +--- +... +assert(not res and err.code == box.error.TIMEOUT) +--- +- true +... +assert(type(err) == 'table') +--- +- true +... +test_run:switch('storage_1_a') +--- +- true +... +is_push_wait_blocked = false +--- +... +test_run:switch('storage_2_a') +--- +- true +... +is_push_wait_blocked = false +--- +... +test_run:switch('router_1') +--- +- true +... +func, iter, i = future:pairs() +--- +... +i, res = func(iter, i) +--- +... +assert(i == 1) +--- +- true +... +assert(res == 10) +--- +- true +... +i, res = func(iter, i) +--- +... +assert(i == 2) +--- +- true +... +assert(res[1] == 20 and not res[2]) +--- +- true +... +assert(future:is_ready()) +--- +- true +... +i, res = func(iter, i) +--- +... +assert(not i) +--- +- true +... +assert(not res) +--- +- true +... +-- Repeat the same to ensure it returns the same. +i, res = func(iter, 1) +--- +... +assert(i == 2) +--- +- true +... +assert(res[1] == 20 and not res[2]) +--- +- true +... +-- Non-pairs functions return correctly unpacked successful results. +res, err = future:wait_result() +--- +... +assert(res[1] == 20 and not res[2] and not err) +--- +- true +... +res, err = future:result() +--- +... +assert(res[1] == 20 and not res[2] and not err) +--- +- true +... +-- Return 2 nils - shouldn't be treated as an error. +future = vshard.router.callrw(bucket_id, 'do_push_wait', \ + {10, {nil, nil}}, {is_async = true}) +--- +... +res, err = future:wait_result() +--- +... +assert(res[1] == nil and res[2] == nil and not err) +--- +- true +... +res, err = future:result() +--- +... +assert(res[1] == nil and res[2] == nil and not err) +--- +- true +... +func, iter, i = future:pairs() +--- +... +i, res = func(iter, i) +--- +... +i, res = func(iter, i) +--- +... +assert(res[1] == nil and res[2] == nil and not err) +--- +- true +... +-- Serialize and tostring. +future +--- +- [] +... +future.key = 'value' +--- +... +future +--- +- key: value +... +tostring(future) +--- +- vshard.net.box.request +... +-- +-- The same, but the push function returns an error. +-- +future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {nil, 'err'}}, \ + {is_async = true}) +--- +... +func, iter, i = future:pairs() +--- +... +i, res = func(iter, i) +--- +... +assert(i == 1) +--- +- true +... +assert(res == 10) +--- +- true +... +i, res = func(iter, i) +--- +... +-- This test is for the sake of checking how the async request handles nil,err +-- result. +assert(i == 2) +--- +- true +... +assert(not res[1] and res[2].message == 'err') +--- +- true +... +assert(type(res[2]) == 'table') +--- +- true +... +i, res = func(iter, i) +--- +... +assert(not i) +--- +- true +... +assert(not res) +--- +- true +... +-- Non-pairs getting of an error. +res, err = future:wait_result() +--- +... +assert(not res and err.message == 'err') +--- +- true +... +assert(type(err) == 'table') +--- +- true +... +res, err = future:result() +--- +... +assert(not res and err.message == 'err') +--- +- true +... +assert(type(err) == 'table') +--- +- true +... +-- -- Test errors from router call. -- new_bid = vshard.consts.DEFAULT_BUCKET_COUNT + 1 diff --git a/test/router/router.test.lua b/test/router/router.test.lua index 0017111..0f3854a 100644 --- a/test/router/router.test.lua +++ b/test/router/router.test.lua @@ -216,13 +216,15 @@ vshard.router.route(bucket_id):callrw('do_push', args, opts) messages -- --- gh-171: support is_async. +-- gh-171, gh-294: support is_async. -- future = vshard.router.callro(bucket_id, 'space_get', {'test', {1}}, {is_async = true}) future:wait_result() future:is_ready() future = vshard.router.callrw(bucket_id, 'raise_client_error', {}, {is_async = true}) res, err = future:wait_result() +-- VShard wraps all errors. +assert(type(err) == 'table') util.portable_error(err) future:is_ready() future = vshard.router.callrw(bucket_id, 'do_push', args, {is_async = true}) @@ -240,6 +242,122 @@ future:wait_result() future = vshard.router.route(bucket_id):callrw('space_get', {'test', {1}}, {is_async = true}) future:wait_result() +-- +-- Error as a result of discard. +-- +future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}}, \ + {is_async = true}) +future:discard() +res, err = future:result() +assert(not res and err.message:match('discarded') ~= nil) +assert(type(err) == 'table') +res, err = future:wait_result() +assert(not res and err.message:match('discarded') ~= nil) +assert(type(err) == 'table') + +-- +-- See how pairs behaves when the final result is not immediately ready. +-- +future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}}, \ + {is_async = true}) +assert(not future:is_ready()) +-- Get the push successfully. +func, iter, i = future:pairs() +i, res = func(iter, i) +assert(i == 1) +assert(res == 10) + +-- Fail to get the final result during the timeout. It is supposed to test how +-- the router knows which result is final and which is just a push. Even before +-- the request ends. +func, iter, i = future:pairs(0.001) +i, res = func(iter, i) +i, res = func(iter, i) +assert(not i and res.code == box.error.TIMEOUT) +assert(type(res) == 'table') + +res, err = future:wait_result(0.001) +assert(not res and err.code == box.error.TIMEOUT) +assert(type(err) == 'table') + +test_run:switch('storage_1_a') +is_push_wait_blocked = false +test_run:switch('storage_2_a') +is_push_wait_blocked = false +test_run:switch('router_1') + +func, iter, i = future:pairs() +i, res = func(iter, i) +assert(i == 1) +assert(res == 10) + +i, res = func(iter, i) +assert(i == 2) +assert(res[1] == 20 and not res[2]) + +assert(future:is_ready()) + +i, res = func(iter, i) +assert(not i) +assert(not res) + +-- Repeat the same to ensure it returns the same. +i, res = func(iter, 1) +assert(i == 2) +assert(res[1] == 20 and not res[2]) + +-- Non-pairs functions return correctly unpacked successful results. +res, err = future:wait_result() +assert(res[1] == 20 and not res[2] and not err) +res, err = future:result() +assert(res[1] == 20 and not res[2] and not err) + +-- Return 2 nils - shouldn't be treated as an error. +future = vshard.router.callrw(bucket_id, 'do_push_wait', \ + {10, {nil, nil}}, {is_async = true}) +res, err = future:wait_result() +assert(res[1] == nil and res[2] == nil and not err) +res, err = future:result() +assert(res[1] == nil and res[2] == nil and not err) +func, iter, i = future:pairs() +i, res = func(iter, i) +i, res = func(iter, i) +assert(res[1] == nil and res[2] == nil and not err) + +-- Serialize and tostring. +future +future.key = 'value' +future +tostring(future) + +-- +-- The same, but the push function returns an error. +-- +future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {nil, 'err'}}, \ + {is_async = true}) +func, iter, i = future:pairs() +i, res = func(iter, i) +assert(i == 1) +assert(res == 10) +i, res = func(iter, i) +-- This test is for the sake of checking how the async request handles nil,err +-- result. +assert(i == 2) +assert(not res[1] and res[2].message == 'err') +assert(type(res[2]) == 'table') +i, res = func(iter, i) +assert(not i) +assert(not res) + +-- Non-pairs getting of an error. +res, err = future:wait_result() +assert(not res and err.message == 'err') +assert(type(err) == 'table') + +res, err = future:result() +assert(not res and err.message == 'err') +assert(type(err) == 'table') + -- -- Test errors from router call. -- diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 3d468f3..42de740 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -470,6 +470,114 @@ end -- API -------------------------------------------------------------------------------- +local function vshard_future_tostring(self) + return 'vshard.net.box.request' +end + +local function vshard_future_serialize(self) + -- Drop the metatable. It is also copied and if returned as is leads to + -- recursive serialization. + local s = setmetatable(table.deepcopy(self), {}) + s._base = nil + return s +end + +local function vshard_future_is_ready(self) + return self._base:is_ready() +end + +local function vshard_future_wrap_result(res) + local storage_ok, res, err = unpack(res) + if storage_ok then + if res == nil and err ~= nil then + return nil, lerror.make(err) + end + return setmetatable({res}, seq_serializer) + end + return nil, lerror.make(res) +end + +local function vshard_future_result(self) + local res, err = self._base:result() + if res == nil then + return nil, lerror.make(err) + end + return vshard_future_wrap_result(res) +end + +local function vshard_future_wait_result(self, timeout) + local res, err = self._base:wait_result(timeout) + if res == nil then + return nil, lerror.make(err) + end + return vshard_future_wrap_result(res) +end + +local function vshard_future_discard(self) + return self._base:discard() +end + +local function vshard_future_iter_next(iter, i) + local res, err + local base_next = iter.base_next + local base_req = iter.base_req + local base = iter.base + -- Need to distinguish the last response from the pushes. Because the former + -- has metadata returned by vshard.storage.call(). + -- At the same time there is no way to check if the base pairs() did its + -- last iteration except calling its next() function again. + -- This, in turn, might lead to a block if the result is not ready yet. + i, res = base_next(base, i) + -- To avoid that there is a 2-phase check. + -- If the request isn't finished after first next(), it means the result is + -- not received. This is a push. Return as is. + -- If the request is finished, it is safe to call next() again to check if + -- it ended. It won't block. + local is_done = base_req:is_ready() + + if not is_done then + -- Definitely a push. It would be finished if the final result was + -- received. + if i == nil then + return nil, lerror.make(res) + end + return i, res + end + if i == nil then + if res ~= nil then + return i, lerror.make(res) + end + return nil, nil + end + -- Will not block because the request is already finished. + if base_next(base, i) == nil then + res, err = vshard_future_wrap_result(res) + if res ~= nil then + return i, res + end + return i, {nil, lerror.make(err)} + end + return i, res +end + +local function vshard_future_pairs(self, timeout) + local next_f, iter, i = self._base:pairs(timeout) + return vshard_future_iter_next, + {base = iter, base_req = self, base_next = next_f}, i +end + +local vshard_future_mt = { + __tostring = vshard_future_tostring, + __serialize = vshard_future_serialize, + __index = { + is_ready = vshard_future_is_ready, + result = vshard_future_result, + wait_result = vshard_future_wait_result, + discard = vshard_future_discard, + pairs = vshard_future_pairs, + } +} + -- -- Since 1.10 netbox supports flag 'is_async'. Given this flag, a -- request result is returned immediately in a form of a future @@ -482,41 +590,9 @@ end -- So vshard.router.call should wrap a future object with its own -- unpacker of result. -- --- Unpack a result given from a future object from --- vshard.storage.call. If future returns [status, result, ...] --- this function returns [result]. Or a classical couple --- nil, error. --- -function future_storage_call_result(self) - local res, err = self:base_result() - if not res then - return nil, err - end - local storage_call_status, call_status, call_error = unpack(res) - if storage_call_status then - if call_status == nil and call_error ~= nil then - return call_status, call_error - else - return setmetatable({call_status}, seq_serializer) - end - end - return nil, call_status -end - --- --- Given a netbox future object, redefine its 'result' method. --- It is impossible to just create a new signle metatable per --- the module as a copy of original future's one because it has --- some upvalues related to the netbox connection. --- -local function wrap_storage_call_future(future) - -- Base 'result' below is got from __index metatable under the - -- hood. But __index is used only when a table has no such a - -- member in itself. So via adding 'result' as a member to a - -- future object its __index.result can be redefined. - future.base_result = future.result - future.result = future_storage_call_result - return future +local function vshard_future_new(future) + -- Use '_' as a prefix so as users could use all normal names. + return setmetatable({_base = future}, vshard_future_mt) end -- Perform shard operation @@ -574,7 +650,7 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica, -- values: true/false and func result. But -- async returns future object. No true/false -- nor func result. So return the first value. - return wrap_storage_call_future(storage_call_status) + return vshard_future_new(storage_call_status) end end err = lerror.make(call_status) -- 2.24.3 (Apple Git-128)