Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: kostja@tarantool.org
Subject: [tarantool-patches] [PATCH v3 4/4] netbox: introduce iterable future objects
Date: Fri,  1 Jun 2018 23:55:58 +0300	[thread overview]
Message-ID: <9044b6d5e206cf3f9da67a1a8f32c070b8db55c4.1527886471.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1527886471.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1527886471.git.v.shpilevoy@tarantool.org>

Netbox has two major ways to execute a request: sync and async.
During execution of any a server can send multiplie responses via
IPROTO_CHUNK. And the execution ways differ in how to handle the
chunks (called messages or pushes).

For a sync request a one can specify on_push callback and its
on_push_ctx argument called on each message.

When a request is async a user has a future object only, and can
not specify any callbacks. To get the pushed messages a one must
iterate over future object like this:
for i, message in future:pairs(one_iteration_timeout) do
...
end
Or ignore messages just calling future:wait_result(). Anyway
messages are not deleted, so a one can iterate over future object
again and again.

Follow up #2677
---
 src/box/lua/net_box.lua |  84 +++++++++++++++++++++
 test/box/push.result    | 191 +++++++++++++++++++++++++++++++++++++++++++++++-
 test/box/push.test.lua  |  79 +++++++++++++++++++-
 3 files changed, 349 insertions(+), 5 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index fe113e740..5d896f7e3 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -272,6 +272,90 @@ local function create_transport(host, port, user, password, callback,
         end
     end
     --
+    -- Get the next message or the final result.
+    -- @param iterator Iterator object.
+    -- @param i Index to get a next message from.
+    --
+    -- @retval nil, nil The request is finished.
+    -- @retval i + 1, object A message/response and its index.
+    -- @retval box.NULL, error An error occured. When this
+    --         function is called in 'for k, v in future:pairs()',
+    --         `k` becomes box.NULL, and `v` becomes error object.
+    --         If a one want to stop the cycle, he can do break.
+    --         With no break the cycle will be continued until
+    --         the request is finished. The iteration continuation
+    --         is useful for example when time is out during a
+    --         next message waiting, but a one does not consider
+    --         this error be critical.
+    --         On error the key becomes exactly box.NULL instead
+    --         of nil, because nil is treated by Lua as iteration
+    --         end marker. Nil does not participate in iteration,
+    --         and does not allow to continue it.
+    --
+    local function request_iterator_next(iterator, i)
+        if i == box.NULL then
+            -- If a user continues iteration after an error -
+            -- restore position.
+            if not iterator.next_i then
+                return nil, nil
+            end
+            i = iterator.next_i
+            iterator.next_i = nil
+        else
+            i = i + 1
+        end
+        local request = iterator.request
+        local messages = request.on_push_ctx
+    ::retry::
+        if i <= #messages then
+            return i, messages[i]
+        end
+        if request:is_ready() then
+            -- After all the messages are iterated, `i` is equal
+            -- to #messages + 1. After response reading `i`
+            -- becomes #messages + 2. It is the trigger to finish
+            -- the iteration.
+            if i > #messages + 1 then
+                return nil, nil
+            end
+            local response, err = request:result()
+            if err then
+                return box.NULL, err
+            end
+            return i, response
+        end
+        local old_message_count = #messages
+        local timeout = iterator.timeout
+        repeat
+            local ts = fiber_clock()
+            request.cond:wait(timeout)
+            timeout = timeout - (fiber_clock() - ts)
+            if request:is_ready() or old_message_count ~= #messages then
+                goto retry
+            end
+        until timeout <= 0
+        iterator.next_i = i
+        return box.NULL, box.error.new(E_TIMEOUT)
+    end
+    --
+    -- Iterate over all messages, received by a request. @Sa
+    -- request_iterator_next for details what to expect in `for`
+    -- key/value pairs.
+    -- @param timeout One iteration timeout.
+    -- @retval next() callback, iterator, zero key.
+    --
+    function request_index:pairs(timeout)
+        if timeout then
+            if type(timeout) ~= 'number' or timeout < 0 then
+                error('Usage: future:pairs(timeout)')
+            end
+        else
+            timeout = TIMEOUT_INFINITY
+        end
+        local iterator = {request = self, timeout = timeout}
+        return request_iterator_next, iterator, 0
+    end
+    --
     -- Wait for a response or error max timeout seconds.
     -- @param timeout Max seconds to wait.
     -- @retval result, nil Success, the response is returned.
diff --git a/test/box/push.result b/test/box/push.result
index 8645533b5..0816c6754 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -299,9 +299,6 @@ c:close()
 s:drop()
 ---
 ...
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
----
-...
 --
 -- Ensure can not push in background.
 --
@@ -316,3 +313,191 @@ ok, err
 - null
 - Session 'background' does not support push()
 ...
+--
+-- Async iterable pushes.
+--
+c = netbox.connect(box.cfg.listen)
+---
+...
+cond = fiber.cond()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i + 100)
+        cond:wait()
+    end
+    return true
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Can not combine callback and async mode.
+ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end})
+---
+...
+assert(not ok)
+---
+- true
+...
+assert(err:find('use future:pairs()') ~= nil)
+---
+- true
+...
+future = c:call('do_pushes', {}, {is_async = true})
+---
+...
+-- Try to ignore pushes.
+while not future:wait_result(0.01) do cond:signal() end
+---
+...
+future:result()
+---
+- [true]
+...
+-- Even if pushes are ignored, they still are available via pairs.
+messages = {}
+---
+...
+keys = {}
+---
+...
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+---
+...
+messages
+---
+- - 101
+  - 102
+  - 103
+  - 104
+  - 105
+  - [true]
+...
+keys
+---
+- - 1
+  - 2
+  - 3
+  - 4
+  - 5
+  - 6
+...
+-- Test timeouts inside `for`. Even if a timeout is got, a user
+-- can continue iteration making as many attempts to get a message
+-- as he wants.
+future = c:call('do_pushes', {}, {is_async = true})
+---
+...
+messages = {}
+---
+...
+keys = {}
+---
+...
+err_count = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i, message in future:pairs(0.01) do
+    if i == nil then
+        err_count = err_count + 1
+        assert(message.code == box.error.TIMEOUT)
+        if err_count % 2 == 0 then
+            cond:signal()
+        end
+    else
+        table.insert(messages, message)
+        table.insert(keys, i)
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Messages and keys are got in the correct order and with no
+-- duplicates regardless of big timeout count.
+messages
+---
+- - 101
+  - 102
+  - 103
+  - 104
+  - 105
+  - [true]
+...
+keys
+---
+- - 1
+  - 2
+  - 3
+  - 4
+  - 5
+  - 6
+...
+err_count
+---
+- 10
+...
+-- Test non-timeout error.
+s = box.schema.create_space('test')
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+function do_push_and_duplicate() box.session.push(100) s:insert{1} end
+---
+...
+future = c:call('do_push_and_duplicate', {}, {is_async = true})
+---
+...
+future:wait_result(1000)
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+messages = {}
+---
+...
+keys = {}
+---
+...
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+---
+...
+messages
+---
+- - 100
+  - Duplicate key exists in unique index 'pk' in space 'test'
+...
+keys
+---
+- - 1
+  - null
+...
+s:drop()
+---
+...
+c:close()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index da0634157..10bc201df 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -153,11 +153,86 @@ s:select{}
 c:close()
 s:drop()
 
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-
 --
 -- Ensure can not push in background.
 --
 f = fiber.create(function() ok, err = box.session.push(100) end)
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 ok, err
+
+--
+-- Async iterable pushes.
+--
+c = netbox.connect(box.cfg.listen)
+cond = fiber.cond()
+test_run:cmd("setopt delimiter ';'")
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i + 100)
+        cond:wait()
+    end
+    return true
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Can not combine callback and async mode.
+ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end})
+assert(not ok)
+assert(err:find('use future:pairs()') ~= nil)
+future = c:call('do_pushes', {}, {is_async = true})
+-- Try to ignore pushes.
+while not future:wait_result(0.01) do cond:signal() end
+future:result()
+
+-- Even if pushes are ignored, they still are available via pairs.
+messages = {}
+keys = {}
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+messages
+keys
+
+-- Test timeouts inside `for`. Even if a timeout is got, a user
+-- can continue iteration making as many attempts to get a message
+-- as he wants.
+future = c:call('do_pushes', {}, {is_async = true})
+messages = {}
+keys = {}
+err_count = 0
+test_run:cmd("setopt delimiter ';'")
+for i, message in future:pairs(0.01) do
+    if i == nil then
+        err_count = err_count + 1
+        assert(message.code == box.error.TIMEOUT)
+        if err_count % 2 == 0 then
+            cond:signal()
+        end
+    else
+        table.insert(messages, message)
+        table.insert(keys, i)
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+-- Messages and keys are got in the correct order and with no
+-- duplicates regardless of big timeout count.
+messages
+keys
+err_count
+
+-- Test non-timeout error.
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+s:replace{1}
+
+function do_push_and_duplicate() box.session.push(100) s:insert{1} end
+future = c:call('do_push_and_duplicate', {}, {is_async = true})
+future:wait_result(1000)
+messages = {}
+keys = {}
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+messages
+keys
+
+s:drop()
+c:close()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-- 
2.15.1 (Apple Git-101)

  parent reply	other threads:[~2018-06-01 20:56 UTC|newest]

Thread overview: 12+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-06-01 20:55 [tarantool-patches] [PATCH v3 0/4] box.session.push Vladislav Shpilevoy
2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 1/4] session: introduce text box.session.push Vladislav Shpilevoy
2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 2/4] session: enable box.session.push in local console Vladislav Shpilevoy
2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 3/4] session: introduce binary box.session.push Vladislav Shpilevoy
2018-06-07 12:53   ` [tarantool-patches] " Konstantin Osipov
2018-06-07 17:02     ` Vladislav Shpilevoy
2018-06-08  3:51       ` Konstantin Osipov
2018-06-01 20:55 ` Vladislav Shpilevoy [this message]
2018-06-07 12:56   ` [tarantool-patches] Re: [PATCH v3 4/4] netbox: introduce iterable future objects Konstantin Osipov
2018-06-07 17:02     ` Vladislav Shpilevoy
2018-06-08  3:52       ` Konstantin Osipov
2018-06-08 14:20         ` Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=9044b6d5e206cf3f9da67a1a8f32c070b8db55c4.1527886471.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH v3 4/4] netbox: introduce iterable future objects' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox