From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 9E72E24E85 for ; Thu, 24 May 2018 16:50:53 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id yqW-C8HNDVXr for ; Thu, 24 May 2018 16:50:53 -0400 (EDT) Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 3BD6824E54 for ; Thu, 24 May 2018 16:50:53 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 1/1] netbox: introduce iterable future objects Date: Thu, 24 May 2018 23:50:51 +0300 Message-Id: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Netbox has two major ways to execute a request: sync and async. During execution of any a server can send multiplie responses via IPROTO_CHUNK. And the execution ways differ in how to handle the chunks (called messages or pushes). For a sync request a one can specify on_push callback and its on_push_ctx argument called on each message. When a request is async a user has a future object only, and can not specify any callbacks. To get the pushed messages a one must iterate over future object like this: for i, message in future:pairs(one_iteration_timeout) do ... end Or ignore messages just calling future:wait_result(). Anyway messages are not deleted, so a one can iterate over future object again and again. Follow up #2677 --- src/box/lua/net_box.lua | 84 +++++++++++++++++++++ test/box/push.result | 191 +++++++++++++++++++++++++++++++++++++++++++++++- test/box/push.test.lua | 79 +++++++++++++++++++- 3 files changed, 349 insertions(+), 5 deletions(-) diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index fe113e740..5d896f7e3 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -272,6 +272,90 @@ local function create_transport(host, port, user, password, callback, end end -- + -- Get the next message or the final result. + -- @param iterator Iterator object. + -- @param i Index to get a next message from. + -- + -- @retval nil, nil The request is finished. + -- @retval i + 1, object A message/response and its index. + -- @retval box.NULL, error An error occured. When this + -- function is called in 'for k, v in future:pairs()', + -- `k` becomes box.NULL, and `v` becomes error object. + -- If a one want to stop the cycle, he can do break. + -- With no break the cycle will be continued until + -- the request is finished. The iteration continuation + -- is useful for example when time is out during a + -- next message waiting, but a one does not consider + -- this error be critical. + -- On error the key becomes exactly box.NULL instead + -- of nil, because nil is treated by Lua as iteration + -- end marker. Nil does not participate in iteration, + -- and does not allow to continue it. + -- + local function request_iterator_next(iterator, i) + if i == box.NULL then + -- If a user continues iteration after an error - + -- restore position. + if not iterator.next_i then + return nil, nil + end + i = iterator.next_i + iterator.next_i = nil + else + i = i + 1 + end + local request = iterator.request + local messages = request.on_push_ctx + ::retry:: + if i <= #messages then + return i, messages[i] + end + if request:is_ready() then + -- After all the messages are iterated, `i` is equal + -- to #messages + 1. After response reading `i` + -- becomes #messages + 2. It is the trigger to finish + -- the iteration. + if i > #messages + 1 then + return nil, nil + end + local response, err = request:result() + if err then + return box.NULL, err + end + return i, response + end + local old_message_count = #messages + local timeout = iterator.timeout + repeat + local ts = fiber_clock() + request.cond:wait(timeout) + timeout = timeout - (fiber_clock() - ts) + if request:is_ready() or old_message_count ~= #messages then + goto retry + end + until timeout <= 0 + iterator.next_i = i + return box.NULL, box.error.new(E_TIMEOUT) + end + -- + -- Iterate over all messages, received by a request. @Sa + -- request_iterator_next for details what to expect in `for` + -- key/value pairs. + -- @param timeout One iteration timeout. + -- @retval next() callback, iterator, zero key. + -- + function request_index:pairs(timeout) + if timeout then + if type(timeout) ~= 'number' or timeout < 0 then + error('Usage: future:pairs(timeout)') + end + else + timeout = TIMEOUT_INFINITY + end + local iterator = {request = self, timeout = timeout} + return request_iterator_next, iterator, 0 + end + -- -- Wait for a response or error max timeout seconds. -- @param timeout Max seconds to wait. -- @retval result, nil Success, the response is returned. diff --git a/test/box/push.result b/test/box/push.result index 04cdc474b..d90e4d190 100644 --- a/test/box/push.result +++ b/test/box/push.result @@ -245,9 +245,6 @@ c:close() s:drop() --- ... -box.schema.user.revoke('guest', 'read,write,execute', 'universe') ---- -... -- -- Ensure can not push in background. -- @@ -268,3 +265,191 @@ ok, err - null - Session 'background' does not support push() ... +-- +-- Async iterable pushes. +-- +c = netbox.connect(box.cfg.listen) +--- +... +cond = fiber.cond() +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function do_pushes() + for i = 1, 5 do + box.session.push(i + 100) + cond:wait() + end + return true +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Can not combine callback and async mode. +ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end}) +--- +... +assert(not ok) +--- +- true +... +assert(err:find('use future:pairs()') ~= nil) +--- +- true +... +future = c:call('do_pushes', {}, {is_async = true}) +--- +... +-- Try to ignore pushes. +while not future:wait_result(0.01) do cond:signal() end +--- +... +future:result() +--- +- [true] +... +-- Even if pushes are ignored, they still are available via pairs. +messages = {} +--- +... +keys = {} +--- +... +for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end +--- +... +messages +--- +- - 101 + - 102 + - 103 + - 104 + - 105 + - [true] +... +keys +--- +- - 1 + - 2 + - 3 + - 4 + - 5 + - 6 +... +-- Test timeouts inside `for`. Even if a timeout is got, a user +-- can continue iteration making as many attempts to get a message +-- as he wants. +future = c:call('do_pushes', {}, {is_async = true}) +--- +... +messages = {} +--- +... +keys = {} +--- +... +err_count = 0 +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +for i, message in future:pairs(0.01) do + if i == nil then + err_count = err_count + 1 + assert(message.code == box.error.TIMEOUT) + if err_count % 2 == 0 then + cond:signal() + end + else + table.insert(messages, message) + table.insert(keys, i) + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Messages and keys are got in the correct order and with no +-- duplicates regardless of big timeout count. +messages +--- +- - 101 + - 102 + - 103 + - 104 + - 105 + - [true] +... +keys +--- +- - 1 + - 2 + - 3 + - 4 + - 5 + - 6 +... +err_count +--- +- 10 +... +-- Test non-timeout error. +s = box.schema.create_space('test') +--- +... +pk = s:create_index('pk') +--- +... +s:replace{1} +--- +- [1] +... +function do_push_and_duplicate() box.session.push(100) s:insert{1} end +--- +... +future = c:call('do_push_and_duplicate', {}, {is_async = true}) +--- +... +future:wait_result(1000) +--- +- null +- Duplicate key exists in unique index 'pk' in space 'test' +... +messages = {} +--- +... +keys = {} +--- +... +for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end +--- +... +messages +--- +- - 100 + - Duplicate key exists in unique index 'pk' in space 'test' +... +keys +--- +- - 1 + - null +... +s:drop() +--- +... +c:close() +--- +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') +--- +... diff --git a/test/box/push.test.lua b/test/box/push.test.lua index b0ff218bb..829883835 100644 --- a/test/box/push.test.lua +++ b/test/box/push.test.lua @@ -131,8 +131,6 @@ r c:close() s:drop() -box.schema.user.revoke('guest', 'read,write,execute', 'universe') - -- -- Ensure can not push in background. -- @@ -141,3 +139,80 @@ err = nil f = fiber.create(function() ok, err = box.session.push(100) end) while f:status() ~= 'dead' do fiber.sleep(0.01) end ok, err + +-- +-- Async iterable pushes. +-- +c = netbox.connect(box.cfg.listen) +cond = fiber.cond() +test_run:cmd("setopt delimiter ';'") +function do_pushes() + for i = 1, 5 do + box.session.push(i + 100) + cond:wait() + end + return true +end; +test_run:cmd("setopt delimiter ''"); + +-- Can not combine callback and async mode. +ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end}) +assert(not ok) +assert(err:find('use future:pairs()') ~= nil) +future = c:call('do_pushes', {}, {is_async = true}) +-- Try to ignore pushes. +while not future:wait_result(0.01) do cond:signal() end +future:result() + +-- Even if pushes are ignored, they still are available via pairs. +messages = {} +keys = {} +for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end +messages +keys + +-- Test timeouts inside `for`. Even if a timeout is got, a user +-- can continue iteration making as many attempts to get a message +-- as he wants. +future = c:call('do_pushes', {}, {is_async = true}) +messages = {} +keys = {} +err_count = 0 +test_run:cmd("setopt delimiter ';'") +for i, message in future:pairs(0.01) do + if i == nil then + err_count = err_count + 1 + assert(message.code == box.error.TIMEOUT) + if err_count % 2 == 0 then + cond:signal() + end + else + table.insert(messages, message) + table.insert(keys, i) + end +end; +test_run:cmd("setopt delimiter ''"); +-- Messages and keys are got in the correct order and with no +-- duplicates regardless of big timeout count. +messages +keys +err_count + +-- Test non-timeout error. +s = box.schema.create_space('test') +pk = s:create_index('pk') +s:replace{1} + +function do_push_and_duplicate() box.session.push(100) s:insert{1} end +future = c:call('do_push_and_duplicate', {}, {is_async = true}) +future:wait_result(1000) +messages = {} +keys = {} +for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end +messages +keys + +s:drop() +c:close() + +box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- 2.15.1 (Apple Git-101)