From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Subject: [tarantool-patches] [PATCH 1/1] netbox: introduce iterable future objects Date: Thu, 24 May 2018 23:50:51 +0300 [thread overview] Message-ID: <b6328d29cd9b1bf27f6b605c64b8a2fe8bdace85.1527194638.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1524228894.git.v.shpilevoy@tarantool.org> Netbox has two major ways to execute a request: sync and async. During execution of any a server can send multiplie responses via IPROTO_CHUNK. And the execution ways differ in how to handle the chunks (called messages or pushes). For a sync request a one can specify on_push callback and its on_push_ctx argument called on each message. When a request is async a user has a future object only, and can not specify any callbacks. To get the pushed messages a one must iterate over future object like this: for i, message in future:pairs(one_iteration_timeout) do ... end Or ignore messages just calling future:wait_result(). Anyway messages are not deleted, so a one can iterate over future object again and again. Follow up #2677 --- src/box/lua/net_box.lua | 84 +++++++++++++++++++++ test/box/push.result | 191 +++++++++++++++++++++++++++++++++++++++++++++++- test/box/push.test.lua | 79 +++++++++++++++++++- 3 files changed, 349 insertions(+), 5 deletions(-) diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index fe113e740..5d896f7e3 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -272,6 +272,90 @@ local function create_transport(host, port, user, password, callback, end end -- + -- Get the next message or the final result. + -- @param iterator Iterator object. + -- @param i Index to get a next message from. + -- + -- @retval nil, nil The request is finished. + -- @retval i + 1, object A message/response and its index. + -- @retval box.NULL, error An error occured. When this + -- function is called in 'for k, v in future:pairs()', + -- `k` becomes box.NULL, and `v` becomes error object. + -- If a one want to stop the cycle, he can do break. + -- With no break the cycle will be continued until + -- the request is finished. The iteration continuation + -- is useful for example when time is out during a + -- next message waiting, but a one does not consider + -- this error be critical. + -- On error the key becomes exactly box.NULL instead + -- of nil, because nil is treated by Lua as iteration + -- end marker. Nil does not participate in iteration, + -- and does not allow to continue it. + -- + local function request_iterator_next(iterator, i) + if i == box.NULL then + -- If a user continues iteration after an error - + -- restore position. + if not iterator.next_i then + return nil, nil + end + i = iterator.next_i + iterator.next_i = nil + else + i = i + 1 + end + local request = iterator.request + local messages = request.on_push_ctx + ::retry:: + if i <= #messages then + return i, messages[i] + end + if request:is_ready() then + -- After all the messages are iterated, `i` is equal + -- to #messages + 1. After response reading `i` + -- becomes #messages + 2. It is the trigger to finish + -- the iteration. + if i > #messages + 1 then + return nil, nil + end + local response, err = request:result() + if err then + return box.NULL, err + end + return i, response + end + local old_message_count = #messages + local timeout = iterator.timeout + repeat + local ts = fiber_clock() + request.cond:wait(timeout) + timeout = timeout - (fiber_clock() - ts) + if request:is_ready() or old_message_count ~= #messages then + goto retry + end + until timeout <= 0 + iterator.next_i = i + return box.NULL, box.error.new(E_TIMEOUT) + end + -- + -- Iterate over all messages, received by a request. @Sa + -- request_iterator_next for details what to expect in `for` + -- key/value pairs. + -- @param timeout One iteration timeout. + -- @retval next() callback, iterator, zero key. + -- + function request_index:pairs(timeout) + if timeout then + if type(timeout) ~= 'number' or timeout < 0 then + error('Usage: future:pairs(timeout)') + end + else + timeout = TIMEOUT_INFINITY + end + local iterator = {request = self, timeout = timeout} + return request_iterator_next, iterator, 0 + end + -- -- Wait for a response or error max timeout seconds. -- @param timeout Max seconds to wait. -- @retval result, nil Success, the response is returned. diff --git a/test/box/push.result b/test/box/push.result index 04cdc474b..d90e4d190 100644 --- a/test/box/push.result +++ b/test/box/push.result @@ -245,9 +245,6 @@ c:close() s:drop() --- ... -box.schema.user.revoke('guest', 'read,write,execute', 'universe') ---- -... -- -- Ensure can not push in background. -- @@ -268,3 +265,191 @@ ok, err - null - Session 'background' does not support push() ... +-- +-- Async iterable pushes. +-- +c = netbox.connect(box.cfg.listen) +--- +... +cond = fiber.cond() +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function do_pushes() + for i = 1, 5 do + box.session.push(i + 100) + cond:wait() + end + return true +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Can not combine callback and async mode. +ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end}) +--- +... +assert(not ok) +--- +- true +... +assert(err:find('use future:pairs()') ~= nil) +--- +- true +... +future = c:call('do_pushes', {}, {is_async = true}) +--- +... +-- Try to ignore pushes. +while not future:wait_result(0.01) do cond:signal() end +--- +... +future:result() +--- +- [true] +... +-- Even if pushes are ignored, they still are available via pairs. +messages = {} +--- +... +keys = {} +--- +... +for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end +--- +... +messages +--- +- - 101 + - 102 + - 103 + - 104 + - 105 + - [true] +... +keys +--- +- - 1 + - 2 + - 3 + - 4 + - 5 + - 6 +... +-- Test timeouts inside `for`. Even if a timeout is got, a user +-- can continue iteration making as many attempts to get a message +-- as he wants. +future = c:call('do_pushes', {}, {is_async = true}) +--- +... +messages = {} +--- +... +keys = {} +--- +... +err_count = 0 +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +for i, message in future:pairs(0.01) do + if i == nil then + err_count = err_count + 1 + assert(message.code == box.error.TIMEOUT) + if err_count % 2 == 0 then + cond:signal() + end + else + table.insert(messages, message) + table.insert(keys, i) + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Messages and keys are got in the correct order and with no +-- duplicates regardless of big timeout count. +messages +--- +- - 101 + - 102 + - 103 + - 104 + - 105 + - [true] +... +keys +--- +- - 1 + - 2 + - 3 + - 4 + - 5 + - 6 +... +err_count +--- +- 10 +... +-- Test non-timeout error. +s = box.schema.create_space('test') +--- +... +pk = s:create_index('pk') +--- +... +s:replace{1} +--- +- [1] +... +function do_push_and_duplicate() box.session.push(100) s:insert{1} end +--- +... +future = c:call('do_push_and_duplicate', {}, {is_async = true}) +--- +... +future:wait_result(1000) +--- +- null +- Duplicate key exists in unique index 'pk' in space 'test' +... +messages = {} +--- +... +keys = {} +--- +... +for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end +--- +... +messages +--- +- - 100 + - Duplicate key exists in unique index 'pk' in space 'test' +... +keys +--- +- - 1 + - null +... +s:drop() +--- +... +c:close() +--- +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') +--- +... diff --git a/test/box/push.test.lua b/test/box/push.test.lua index b0ff218bb..829883835 100644 --- a/test/box/push.test.lua +++ b/test/box/push.test.lua @@ -131,8 +131,6 @@ r c:close() s:drop() -box.schema.user.revoke('guest', 'read,write,execute', 'universe') - -- -- Ensure can not push in background. -- @@ -141,3 +139,80 @@ err = nil f = fiber.create(function() ok, err = box.session.push(100) end) while f:status() ~= 'dead' do fiber.sleep(0.01) end ok, err + +-- +-- Async iterable pushes. +-- +c = netbox.connect(box.cfg.listen) +cond = fiber.cond() +test_run:cmd("setopt delimiter ';'") +function do_pushes() + for i = 1, 5 do + box.session.push(i + 100) + cond:wait() + end + return true +end; +test_run:cmd("setopt delimiter ''"); + +-- Can not combine callback and async mode. +ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end}) +assert(not ok) +assert(err:find('use future:pairs()') ~= nil) +future = c:call('do_pushes', {}, {is_async = true}) +-- Try to ignore pushes. +while not future:wait_result(0.01) do cond:signal() end +future:result() + +-- Even if pushes are ignored, they still are available via pairs. +messages = {} +keys = {} +for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end +messages +keys + +-- Test timeouts inside `for`. Even if a timeout is got, a user +-- can continue iteration making as many attempts to get a message +-- as he wants. +future = c:call('do_pushes', {}, {is_async = true}) +messages = {} +keys = {} +err_count = 0 +test_run:cmd("setopt delimiter ';'") +for i, message in future:pairs(0.01) do + if i == nil then + err_count = err_count + 1 + assert(message.code == box.error.TIMEOUT) + if err_count % 2 == 0 then + cond:signal() + end + else + table.insert(messages, message) + table.insert(keys, i) + end +end; +test_run:cmd("setopt delimiter ''"); +-- Messages and keys are got in the correct order and with no +-- duplicates regardless of big timeout count. +messages +keys +err_count + +-- Test non-timeout error. +s = box.schema.create_space('test') +pk = s:create_index('pk') +s:replace{1} + +function do_push_and_duplicate() box.session.push(100) s:insert{1} end +future = c:call('do_push_and_duplicate', {}, {is_async = true}) +future:wait_result(1000) +messages = {} +keys = {} +for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end +messages +keys + +s:drop() +c:close() + +box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- 2.15.1 (Apple Git-101)
next prev parent reply other threads:[~2018-05-24 20:50 UTC|newest] Thread overview: 34+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-04-20 13:24 [PATCH v2 00/10] session: introduce box.session.push Vladislav Shpilevoy 2018-04-20 13:24 ` [PATCH v2 01/10] yaml: don't throw OOM on any error in yaml encoding Vladislav Shpilevoy 2018-05-10 18:10 ` [tarantool-patches] " Konstantin Osipov 2018-04-20 13:24 ` [tarantool-patches] [PATCH v2 10/10] session: introduce binary box.session.push Vladislav Shpilevoy 2018-05-10 19:50 ` Konstantin Osipov 2018-05-24 20:50 ` [tarantool-patches] " Vladislav Shpilevoy 2018-04-20 13:24 ` [PATCH v2 02/10] yaml: introduce yaml.encode_tagged Vladislav Shpilevoy 2018-05-10 18:22 ` [tarantool-patches] " Konstantin Osipov 2018-05-24 20:50 ` [tarantool-patches] " Vladislav Shpilevoy 2018-05-30 19:15 ` Konstantin Osipov 2018-05-30 20:49 ` Vladislav Shpilevoy 2018-05-31 10:46 ` Konstantin Osipov 2018-04-20 13:24 ` [PATCH v2 03/10] yaml: introduce yaml.decode_tag Vladislav Shpilevoy 2018-05-10 18:41 ` [tarantool-patches] " Konstantin Osipov 2018-05-24 20:50 ` [tarantool-patches] " Vladislav Shpilevoy 2018-05-31 10:54 ` Konstantin Osipov 2018-05-31 11:36 ` Konstantin Osipov 2018-04-20 13:24 ` [PATCH v2 04/10] console: use Lua C API to do formatting for console Vladislav Shpilevoy 2018-05-10 18:46 ` [tarantool-patches] " Konstantin Osipov 2018-05-24 20:50 ` [tarantool-patches] " Vladislav Shpilevoy 2018-04-20 13:24 ` [PATCH v2 05/10] session: move salt into iproto connection Vladislav Shpilevoy 2018-05-10 18:47 ` [tarantool-patches] " Konstantin Osipov 2018-04-20 13:24 ` [PATCH v2 06/10] session: introduce session vtab and meta Vladislav Shpilevoy 2018-05-10 19:20 ` [tarantool-patches] " Konstantin Osipov 2018-05-24 20:50 ` [tarantool-patches] " Vladislav Shpilevoy 2018-04-20 13:24 ` [PATCH v2 07/10] port: rename dump() into dump_msgpack() Vladislav Shpilevoy 2018-05-10 19:21 ` [tarantool-patches] " Konstantin Osipov 2018-04-20 13:24 ` [PATCH v2 08/10] session: introduce text box.session.push Vladislav Shpilevoy 2018-05-10 19:27 ` [tarantool-patches] " Konstantin Osipov 2018-05-24 20:50 ` [tarantool-patches] " Vladislav Shpilevoy 2018-04-20 13:24 ` [PATCH v2 09/10] session: enable box.session.push in local console Vladislav Shpilevoy 2018-05-10 19:28 ` [tarantool-patches] " Konstantin Osipov 2018-05-24 20:50 ` Vladislav Shpilevoy [this message] 2018-06-04 22:17 ` [tarantool-patches] Re: [PATCH 1/1] netbox: introduce iterable future objects Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=b6328d29cd9b1bf27f6b605c64b8a2fe8bdace85.1527194638.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH 1/1] netbox: introduce iterable future objects' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox