[tarantool-patches] [PATCH v3 4/4] netbox: introduce iterable future objects
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri Jun 1 23:55:58 MSK 2018
Netbox has two major ways to execute a request: sync and async.
During execution of any a server can send multiplie responses via
IPROTO_CHUNK. And the execution ways differ in how to handle the
chunks (called messages or pushes).
For a sync request a one can specify on_push callback and its
on_push_ctx argument called on each message.
When a request is async a user has a future object only, and can
not specify any callbacks. To get the pushed messages a one must
iterate over future object like this:
for i, message in future:pairs(one_iteration_timeout) do
...
end
Or ignore messages just calling future:wait_result(). Anyway
messages are not deleted, so a one can iterate over future object
again and again.
Follow up #2677
---
src/box/lua/net_box.lua | 84 +++++++++++++++++++++
test/box/push.result | 191 +++++++++++++++++++++++++++++++++++++++++++++++-
test/box/push.test.lua | 79 +++++++++++++++++++-
3 files changed, 349 insertions(+), 5 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index fe113e740..5d896f7e3 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -272,6 +272,90 @@ local function create_transport(host, port, user, password, callback,
end
end
--
+ -- Get the next message or the final result.
+ -- @param iterator Iterator object.
+ -- @param i Index to get a next message from.
+ --
+ -- @retval nil, nil The request is finished.
+ -- @retval i + 1, object A message/response and its index.
+ -- @retval box.NULL, error An error occured. When this
+ -- function is called in 'for k, v in future:pairs()',
+ -- `k` becomes box.NULL, and `v` becomes error object.
+ -- If a one want to stop the cycle, he can do break.
+ -- With no break the cycle will be continued until
+ -- the request is finished. The iteration continuation
+ -- is useful for example when time is out during a
+ -- next message waiting, but a one does not consider
+ -- this error be critical.
+ -- On error the key becomes exactly box.NULL instead
+ -- of nil, because nil is treated by Lua as iteration
+ -- end marker. Nil does not participate in iteration,
+ -- and does not allow to continue it.
+ --
+ local function request_iterator_next(iterator, i)
+ if i == box.NULL then
+ -- If a user continues iteration after an error -
+ -- restore position.
+ if not iterator.next_i then
+ return nil, nil
+ end
+ i = iterator.next_i
+ iterator.next_i = nil
+ else
+ i = i + 1
+ end
+ local request = iterator.request
+ local messages = request.on_push_ctx
+ ::retry::
+ if i <= #messages then
+ return i, messages[i]
+ end
+ if request:is_ready() then
+ -- After all the messages are iterated, `i` is equal
+ -- to #messages + 1. After response reading `i`
+ -- becomes #messages + 2. It is the trigger to finish
+ -- the iteration.
+ if i > #messages + 1 then
+ return nil, nil
+ end
+ local response, err = request:result()
+ if err then
+ return box.NULL, err
+ end
+ return i, response
+ end
+ local old_message_count = #messages
+ local timeout = iterator.timeout
+ repeat
+ local ts = fiber_clock()
+ request.cond:wait(timeout)
+ timeout = timeout - (fiber_clock() - ts)
+ if request:is_ready() or old_message_count ~= #messages then
+ goto retry
+ end
+ until timeout <= 0
+ iterator.next_i = i
+ return box.NULL, box.error.new(E_TIMEOUT)
+ end
+ --
+ -- Iterate over all messages, received by a request. @Sa
+ -- request_iterator_next for details what to expect in `for`
+ -- key/value pairs.
+ -- @param timeout One iteration timeout.
+ -- @retval next() callback, iterator, zero key.
+ --
+ function request_index:pairs(timeout)
+ if timeout then
+ if type(timeout) ~= 'number' or timeout < 0 then
+ error('Usage: future:pairs(timeout)')
+ end
+ else
+ timeout = TIMEOUT_INFINITY
+ end
+ local iterator = {request = self, timeout = timeout}
+ return request_iterator_next, iterator, 0
+ end
+ --
-- Wait for a response or error max timeout seconds.
-- @param timeout Max seconds to wait.
-- @retval result, nil Success, the response is returned.
diff --git a/test/box/push.result b/test/box/push.result
index 8645533b5..0816c6754 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -299,9 +299,6 @@ c:close()
s:drop()
---
...
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
----
-...
--
-- Ensure can not push in background.
--
@@ -316,3 +313,191 @@ ok, err
- null
- Session 'background' does not support push()
...
+--
+-- Async iterable pushes.
+--
+c = netbox.connect(box.cfg.listen)
+---
+...
+cond = fiber.cond()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function do_pushes()
+ for i = 1, 5 do
+ box.session.push(i + 100)
+ cond:wait()
+ end
+ return true
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Can not combine callback and async mode.
+ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end})
+---
+...
+assert(not ok)
+---
+- true
+...
+assert(err:find('use future:pairs()') ~= nil)
+---
+- true
+...
+future = c:call('do_pushes', {}, {is_async = true})
+---
+...
+-- Try to ignore pushes.
+while not future:wait_result(0.01) do cond:signal() end
+---
+...
+future:result()
+---
+- [true]
+...
+-- Even if pushes are ignored, they still are available via pairs.
+messages = {}
+---
+...
+keys = {}
+---
+...
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+---
+...
+messages
+---
+- - 101
+ - 102
+ - 103
+ - 104
+ - 105
+ - [true]
+...
+keys
+---
+- - 1
+ - 2
+ - 3
+ - 4
+ - 5
+ - 6
+...
+-- Test timeouts inside `for`. Even if a timeout is got, a user
+-- can continue iteration making as many attempts to get a message
+-- as he wants.
+future = c:call('do_pushes', {}, {is_async = true})
+---
+...
+messages = {}
+---
+...
+keys = {}
+---
+...
+err_count = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i, message in future:pairs(0.01) do
+ if i == nil then
+ err_count = err_count + 1
+ assert(message.code == box.error.TIMEOUT)
+ if err_count % 2 == 0 then
+ cond:signal()
+ end
+ else
+ table.insert(messages, message)
+ table.insert(keys, i)
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Messages and keys are got in the correct order and with no
+-- duplicates regardless of big timeout count.
+messages
+---
+- - 101
+ - 102
+ - 103
+ - 104
+ - 105
+ - [true]
+...
+keys
+---
+- - 1
+ - 2
+ - 3
+ - 4
+ - 5
+ - 6
+...
+err_count
+---
+- 10
+...
+-- Test non-timeout error.
+s = box.schema.create_space('test')
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+function do_push_and_duplicate() box.session.push(100) s:insert{1} end
+---
+...
+future = c:call('do_push_and_duplicate', {}, {is_async = true})
+---
+...
+future:wait_result(1000)
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+messages = {}
+---
+...
+keys = {}
+---
+...
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+---
+...
+messages
+---
+- - 100
+ - Duplicate key exists in unique index 'pk' in space 'test'
+...
+keys
+---
+- - 1
+ - null
+...
+s:drop()
+---
+...
+c:close()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index da0634157..10bc201df 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -153,11 +153,86 @@ s:select{}
c:close()
s:drop()
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-
--
-- Ensure can not push in background.
--
f = fiber.create(function() ok, err = box.session.push(100) end)
while f:status() ~= 'dead' do fiber.sleep(0.01) end
ok, err
+
+--
+-- Async iterable pushes.
+--
+c = netbox.connect(box.cfg.listen)
+cond = fiber.cond()
+test_run:cmd("setopt delimiter ';'")
+function do_pushes()
+ for i = 1, 5 do
+ box.session.push(i + 100)
+ cond:wait()
+ end
+ return true
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Can not combine callback and async mode.
+ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end})
+assert(not ok)
+assert(err:find('use future:pairs()') ~= nil)
+future = c:call('do_pushes', {}, {is_async = true})
+-- Try to ignore pushes.
+while not future:wait_result(0.01) do cond:signal() end
+future:result()
+
+-- Even if pushes are ignored, they still are available via pairs.
+messages = {}
+keys = {}
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+messages
+keys
+
+-- Test timeouts inside `for`. Even if a timeout is got, a user
+-- can continue iteration making as many attempts to get a message
+-- as he wants.
+future = c:call('do_pushes', {}, {is_async = true})
+messages = {}
+keys = {}
+err_count = 0
+test_run:cmd("setopt delimiter ';'")
+for i, message in future:pairs(0.01) do
+ if i == nil then
+ err_count = err_count + 1
+ assert(message.code == box.error.TIMEOUT)
+ if err_count % 2 == 0 then
+ cond:signal()
+ end
+ else
+ table.insert(messages, message)
+ table.insert(keys, i)
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+-- Messages and keys are got in the correct order and with no
+-- duplicates regardless of big timeout count.
+messages
+keys
+err_count
+
+-- Test non-timeout error.
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+s:replace{1}
+
+function do_push_and_duplicate() box.session.push(100) s:insert{1} end
+future = c:call('do_push_and_duplicate', {}, {is_async = true})
+future:wait_result(1000)
+messages = {}
+keys = {}
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+messages
+keys
+
+s:drop()
+c:close()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
--
2.15.1 (Apple Git-101)
More information about the Tarantool-patches
mailing list