* [PATCH 0/8] netbox: introduce fiber-async API
@ 2018-04-16 18:39 Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
` (7 more replies)
0 siblings, 8 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Branch: http://github.com/tarantool/tarantool/tree/gh-3107-async-netbox
Issue: https://github.com/tarantool/tarantool/issues/3107
The patchset introduces a fiber-async API for netbox, but before
a lot of preparation work is done.
At first, the box.error module is fixed by repairing of
box.error.raise() and introducing box.error.new(). Box.error.new()
is needed for an async netbox future object to be able to save an
error object until a user requests a result. The second necessity
in box.error.new() is that with no error object a netbox can not
satisfy an error returning policy: in a case of error return nil
and error object.
At second, the interactive console is fixed to be able to detect a
spurious wakeup. It was needed for a first version of async
netbox, but then it was removed from the final version, and the
patch remains.
At third, a real netbox codecs are introduced. Netbox async future
object must be able to decode a response out of call context, when
the future object is already returned to a user. The future object
must be able to decode a response by only raw data and method
name. Moreover, it allows to remove double decoding of all
requests for https://github.com/tarantool/tarantool/issues/3333
(it is not implemented here - another patchset is needed).
At fourth, the async API is introduced.
Now any netbox call blocks a caller-fiber until a result is read
from a socket, or time is out. To use it asynchronously it is
necessary to create a fiber per request. Sometimes it is
unwanted - for example if RPS is very high (for example, about
100k), and latency is about 1 second. Or when it is neccessary
to send multiple requests in paralles and then collect responses
(map-reduce).
The patchset introduces a new option for all netbox requests:
is_async. With this option any called netbox method returns
immediately (but still yields for a moment) a 'future' object.
By a future object a user can check if the request is finalized,
get a result or error, wait for a timeout, discard a response.
Example of is_async usage:
future = conn:call(func, {params}, {..., is_async = true})
-- Do some work ...
if not future.is_ready() then
result, err = future:wait_result(timeout)
end
-- Or:
result, error = future:result()
A future:result() and :wait_result() returns either an error or
a response in the same format, as the sync versions of the called
methods.
Vladislav Shpilevoy (8):
lua: fix box.error.raise
lua: allow to create and error object with no throw
console: fix a bug in interactive readline usage
netbox: extend codec with 'decode' methods
test: fix unstable test
netbox: introduce fiber-async API
netbox: remove schema_version from requests
netbox: implement perform_request via async version
src/box/lua/console.c | 5 +-
src/box/lua/error.cc | 93 +++++---
src/box/lua/net_box.c | 105 ++++-----
src/box/lua/net_box.lua | 349 ++++++++++++++++++++----------
test/box/errinj.result | 3 +
test/box/errinj.test.lua | 1 +
test/box/misc.result | 64 ++++++
test/box/misc.test.lua | 28 +++
test/box/net.box.result | 537 +++++++++++++++++++++++++++++++++++++++++++++-
test/box/net.box.test.lua | 190 +++++++++++++++-
test/box/sql.result | 2 +
11 files changed, 1169 insertions(+), 208 deletions(-)
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 1/8] lua: fix box.error.raise
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:19 ` Vladimir Davydov
2018-05-08 15:36 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
` (6 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
It did not work because raise is implemented as __index metatable
member, and error() is __call metatable member. The second one
takes additional implicit argument - self.
---
src/box/lua/error.cc | 78 +++++++++++++++++++++++++++++++++-----------------
test/box/misc.result | 39 +++++++++++++++++++++++++
test/box/misc.test.lua | 18 ++++++++++++
3 files changed, 108 insertions(+), 27 deletions(-)
diff --git a/src/box/lua/error.cc b/src/box/lua/error.cc
index 314907421..56cc2c563 100644
--- a/src/box/lua/error.cc
+++ b/src/box/lua/error.cc
@@ -42,25 +42,18 @@ extern "C" {
#include "lua/utils.h"
#include "box/error.h"
-static int
-luaT_error_raise(lua_State *L)
+static void
+luaT_error_create(lua_State *L, int top, int top_base)
{
uint32_t code = 0;
const char *reason = NULL;
const char *file = "";
unsigned line = 0;
lua_Debug info;
- /* lua_type(L, 1) == LUA_TTABLE - box.error table */
- int top = lua_gettop(L);
- if (top <= 1) {
- /* re-throw saved exceptions (if any) */
- if (box_error_last())
- luaT_error(L);
- return 0;
- } else if (top >= 2 && lua_type(L, 2) == LUA_TNUMBER) {
- code = lua_tonumber(L, 2);
+ if (top >= top_base && lua_type(L, top_base) == LUA_TNUMBER) {
+ code = lua_tonumber(L, top_base);
reason = tnt_errcode_desc(code);
- if (top > 2) {
+ if (top > top_base) {
/* Call string.format(reason, ...) to format message */
lua_getglobal(L, "string");
if (lua_isnil(L, -1))
@@ -69,24 +62,29 @@ luaT_error_raise(lua_State *L)
if (lua_isnil(L, -1))
goto raise;
lua_pushstring(L, reason);
- for (int i = 3; i <= top; i++)
+ for (int i = top_base + 1; i <= top; i++)
lua_pushvalue(L, i);
- lua_call(L, top - 1, 1);
+ lua_call(L, top - top_base + 1, 1);
reason = lua_tostring(L, -1);
} else if (strchr(reason, '%') != NULL) {
/* Missing arguments to format string */
luaL_error(L, "box.error(): bad arguments");
}
- } else if (top == 2 && lua_istable(L, 2)) {
- /* A special case that rethrows raw error (used by net.box) */
- lua_getfield(L, 2, "code");
- code = lua_tonumber(L, -1);
- lua_pop(L, 1);
- lua_getfield(L, 2, "reason");
- reason = lua_tostring(L, -1);
- if (reason == NULL)
- reason = "";
- lua_pop(L, 1);
+ } else if (top == top_base) {
+ if (lua_istable(L, top_base)) {
+ /* A special case that rethrows raw error (used by net.box) */
+ lua_getfield(L, top_base, "code");
+ code = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+ lua_getfield(L, top_base, "reason");
+ reason = lua_tostring(L, -1);
+ if (reason == NULL)
+ reason = "";
+ lua_pop(L, 1);
+ } else if (luaL_iserror(L, top_base)) {
+ lua_error(L);
+ return;
+ }
} else {
luaL_error(L, "box.error(): bad arguments");
}
@@ -104,8 +102,34 @@ raise:
}
say_debug("box.error() at %s:%i", file, line);
box_error_set(file, line, code, "%s", reason);
- luaT_error(L);
- return 0;
+}
+
+static int
+luaT_error_call(lua_State *L)
+{
+ int top = lua_gettop(L);
+ if (top <= 1) {
+ /* Re-throw saved exceptions if any. */
+ if (box_error_last())
+ luaT_error(L);
+ return 0;
+ }
+ luaT_error_create(L, top, 2);
+ return luaT_error(L);
+}
+
+static int
+luaT_error_raise(lua_State *L)
+{
+ int top = lua_gettop(L);
+ if (top == 0) {
+ /* Re-throw saved exceptions if any. */
+ if (box_error_last())
+ luaT_error(L);
+ return 0;
+ }
+ luaT_error_create(L, top, 1);
+ return luaT_error(L);
}
static int
@@ -214,7 +238,7 @@ box_lua_error_init(struct lua_State *L) {
}
lua_newtable(L);
{
- lua_pushcfunction(L, luaT_error_raise);
+ lua_pushcfunction(L, luaT_error_call);
lua_setfield(L, -2, "__call");
lua_newtable(L);
diff --git a/test/box/misc.result b/test/box/misc.result
index 57717c4fe..2102e4a1c 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -176,6 +176,45 @@ box.error(box.error.UNSUPPORTED)
---
- error: 'box.error(): bad arguments'
...
+--
+-- box.error.raise not worked because it is __index method of
+-- box.error and box.error() is the __call method. The second
+-- one takes itself as the first argument.
+--
+box.error(box.error.CREATE_SPACE, "space", "error")
+---
+- error: 'Failed to create space ''space'': error'
+...
+box.error()
+---
+- error: 'Failed to create space ''space'': error'
+...
+box.error.raise()
+---
+- error: 'Failed to create space ''space'': error'
+...
+box.error.raise(box.error.CREATE_SPACE, "space", "error")
+---
+- error: 'Failed to create space ''space'': error'
+...
+box.error.raise(box.error.UNKNOWN)
+---
+- error: Unknown error
+...
+--
+-- Allow to rethrow error.
+--
+_, err = pcall(box.error, box.error.UNKNOWN)
+---
+...
+box.error(err)
+---
+- error: Unknown error
+...
+box.error.raise(err)
+---
+- error: Unknown error
+...
----------------
-- # box.stat
----------------
diff --git a/test/box/misc.test.lua b/test/box/misc.test.lua
index b7bf600c3..299dc830f 100644
--- a/test/box/misc.test.lua
+++ b/test/box/misc.test.lua
@@ -52,6 +52,24 @@ box.error(box.error.UNSUPPORTED, "x", "x%s")
box.error(box.error.UNSUPPORTED, "x")
box.error(box.error.UNSUPPORTED)
+--
+-- box.error.raise not worked because it is __index method of
+-- box.error and box.error() is the __call method. The second
+-- one takes itself as the first argument.
+--
+box.error(box.error.CREATE_SPACE, "space", "error")
+box.error()
+box.error.raise()
+box.error.raise(box.error.CREATE_SPACE, "space", "error")
+box.error.raise(box.error.UNKNOWN)
+
+--
+-- Allow to rethrow error.
+--
+_, err = pcall(box.error, box.error.UNKNOWN)
+box.error(err)
+box.error.raise(err)
+
----------------
-- # box.stat
----------------
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 2/8] lua: allow to create and error object with no throw
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
` (5 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
It is needed to return error via 'nil, error_object' notation,
and to store an error object to return it later.
Closes #3031
---
src/box/lua/error.cc | 15 +++++++++++++++
test/box/misc.result | 25 +++++++++++++++++++++++++
test/box/misc.test.lua | 10 ++++++++++
3 files changed, 50 insertions(+)
diff --git a/src/box/lua/error.cc b/src/box/lua/error.cc
index 56cc2c563..2a56f1a12 100644
--- a/src/box/lua/error.cc
+++ b/src/box/lua/error.cc
@@ -148,6 +148,17 @@ luaT_error_last(lua_State *L)
return 1;
}
+static int
+luaT_error_new(lua_State *L)
+{
+ int top = lua_gettop(L);
+ if (top == 0)
+ return luaL_error(L, "Usage: box.error.new(code, args)");
+ luaT_error_create(L, top, 1);
+ lua_settop(L, 0);
+ return luaT_error_last(L);
+}
+
static int
luaT_error_clear(lua_State *L)
{
@@ -254,6 +265,10 @@ box_lua_error_init(struct lua_State *L) {
lua_pushcfunction(L, luaT_error_raise);
lua_setfield(L, -2, "raise");
}
+ {
+ lua_pushcfunction(L, luaT_error_new);
+ lua_setfield(L, -2, "new");
+ }
lua_setfield(L, -2, "__index");
}
lua_setmetatable(L, -2);
diff --git a/test/box/misc.result b/test/box/misc.result
index 2102e4a1c..4b0f0e53d 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -215,6 +215,31 @@ box.error.raise(err)
---
- error: Unknown error
...
+--
+-- gh-3031: allow to create an error object with no throwing it.
+--
+e = box.error.new(box.error.UNKNOWN)
+---
+...
+e
+---
+- Unknown error
+...
+e = box.error.new(box.error.CREATE_SPACE, "space", "error")
+---
+...
+e
+---
+- 'Failed to create space ''space'': error'
+...
+box.error.new()
+---
+- error: 'Usage: box.error.new(code, args)'
+...
+box.error.raise()
+---
+- error: 'Failed to create space ''space'': error'
+...
----------------
-- # box.stat
----------------
diff --git a/test/box/misc.test.lua b/test/box/misc.test.lua
index 299dc830f..33900d24e 100644
--- a/test/box/misc.test.lua
+++ b/test/box/misc.test.lua
@@ -70,6 +70,16 @@ _, err = pcall(box.error, box.error.UNKNOWN)
box.error(err)
box.error.raise(err)
+--
+-- gh-3031: allow to create an error object with no throwing it.
+--
+e = box.error.new(box.error.UNKNOWN)
+e
+e = box.error.new(box.error.CREATE_SPACE, "space", "error")
+e
+box.error.new()
+box.error.raise()
+
----------------
-- # box.stat
----------------
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 3/8] console: fix a bug in interactive readline usage
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
` (4 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Spurious wakeups are possible in console, that makes readline
think that there are some data on stdin. Waked up readline
returns garbage instead of string, that crashes a server on
assertion in Lua.
Closes #3343
---
src/box/lua/console.c | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index d27d7ecac..7a5ac5550 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -221,8 +221,9 @@ lbox_console_readline(struct lua_State *L)
*/
rl_callback_handler_install(prompt, console_push_line);
top = lua_gettop(L);
- while (top == lua_gettop(L) &&
- coio_wait(STDIN_FILENO, COIO_READ, TIMEOUT_INFINITY)) {
+ while (top == lua_gettop(L)) {
+ while (coio_wait(STDIN_FILENO, COIO_READ,
+ TIMEOUT_INFINITY) == 0);
rl_callback_read_char();
}
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (2 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:42 ` Vladimir Davydov
2018-05-08 15:49 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
` (3 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Netbox has a table 'method_codec' that is used to encode a
request by a method name. But a response is decoded out of codec.
It leads to
1) decoding into Lua tables before decoding into tuples where
needed - it is double decoding and produces a lot of garbage;
2) each method contains hacks like one_tuple(), or single tuple
check.
These things can not be fixed with no real codec instead of
encoder only.
Also global table with decoders is needed for #3107, where
a request could be sent async with no fiber blocking. An async
response when received already does not have a call context - it
has only method name.
Needed for #3107
---
src/box/lua/net_box.lua | 116 +++++++++++++++++++++++++++++-------------------
test/box/net.box.result | 14 ++++++
test/box/sql.result | 2 +
3 files changed, 87 insertions(+), 45 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 4ed2b375d..3868cdf1c 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
-- utility tables
local is_final_state = {closed = 1, error = 1}
-local method_codec = {
+
+local function decode_nil(...) end
+local function decode_nothing(...) return ... end
+local function decode_one_tuple(response)
+ if response[1] then
+ return box.tuple.new(response[1])
+ end
+end
+local function decode_single_tuple(response)
+ if response[2] then
+ return nil, box.error.MORE_THAN_ONE_TUPLE
+ end
+ if response[1] then
+ return box.tuple.new(response[1])
+ end
+end
+local function decode_select(response)
+ setmetatable(response, sequence_mt)
+ for i, v in pairs(response) do
+ response[i] = box.tuple.new(v)
+ end
+ return response
+end
+local function decode_count(response)
+ return response[1]
+end
+
+local method_encoder = {
ping = internal.encode_ping,
call_16 = internal.encode_call_16,
call_17 = internal.encode_call,
@@ -61,6 +88,10 @@ local method_codec = {
update = internal.encode_update,
upsert = internal.encode_upsert,
select = internal.encode_select,
+ get = internal.encode_select,
+ min = internal.encode_select,
+ max = internal.encode_select,
+ count = internal.encode_call,
-- inject raw data into connection, used by console and tests
inject = function(buf, id, schema_version, bytes)
local ptr = buf:reserve(#bytes)
@@ -69,6 +100,24 @@ local method_codec = {
end
}
+local method_decoder = {
+ ping = decode_nil,
+ call_16 = decode_select,
+ call_17 = decode_nothing,
+ eval = decode_nothing,
+ insert = decode_one_tuple,
+ replace = decode_one_tuple,
+ delete = decode_one_tuple,
+ update = decode_one_tuple,
+ upsert = decode_nil,
+ select = decode_select,
+ get = decode_single_tuple,
+ min = decode_single_tuple,
+ max = decode_single_tuple,
+ count = decode_count,
+ inject = decode_nothing,
+}
+
local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
--
@@ -278,7 +327,7 @@ local function create_transport(host, port, user, password, callback,
worker_fiber:wakeup()
end
local id = next_request_id
- method_codec[method](send_buf, id, schema_version, ...)
+ method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
local request = table_new(0, 6) -- reserve space for 6 keys
request.client = fiber_self()
@@ -336,7 +385,10 @@ local function create_transport(host, port, user, password, callback,
-- Decode xrow.body[DATA] to Lua objects
body, body_end_check = decode(body_rpos)
assert(body_end == body_end_check, "invalid xrow length")
- request.response = body[IPROTO_DATA_KEY]
+ if body and body[IPROTO_DATA_KEY] then
+ request.response, request.errno =
+ method_decoder[request.method](body[IPROTO_DATA_KEY])
+ end
wakeup_client(request.client)
end
@@ -417,7 +469,7 @@ local function create_transport(host, port, user, password, callback,
log.warn("Netbox text protocol support is deprecated since 1.10, "..
"please use require('console').connect() instead")
local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
- method_codec.inject(send_buf, nil, nil, setup_delimiter)
+ method_encoder.inject(send_buf, nil, nil, setup_delimiter)
local err, response = send_and_recv_console()
if err then
return error_sm(err, response)
@@ -835,18 +887,8 @@ function remote_methods:_request(method, opts, ...)
end
err, res = perform_request(timeout, buffer, method,
self.schema_version, ...)
- if not err and buffer ~= nil then
- return res -- the length of xrow.body
- elseif not err then
- setmetatable(res, sequence_mt)
- local postproc = method ~= 'eval' and method ~= 'call_17'
- if postproc then
- local tnew = box.tuple.new
- for i, v in pairs(res) do
- res[i] = tnew(v)
- end
- end
- return res -- decoded xrow.body[DATA]
+ if not err then
+ return res
elseif err == E_WRONG_SCHEMA_VERSION then
err = nil
end
@@ -1056,25 +1098,17 @@ function console_methods:eval(line, timeout)
return res[1] or res
end
-local function one_tuple(tab)
- if type(tab) ~= 'table' then
- return tab
- elseif tab[1] ~= nil then
- return tab[1]
- end
-end
-
space_metatable = function(remote)
local methods = {}
function methods:insert(tuple, opts)
check_space_arg(self, 'insert')
- return one_tuple(remote:_request('insert', opts, self.id, tuple))
+ return remote:_request('insert', opts, self.id, tuple)
end
function methods:replace(tuple, opts)
check_space_arg(self, 'replace')
- return one_tuple(remote:_request('replace', opts, self.id, tuple))
+ return remote:_request('replace', opts, self.id, tuple)
end
function methods:select(key, opts)
@@ -1094,8 +1128,7 @@ space_metatable = function(remote)
function methods:upsert(key, oplist, opts)
check_space_arg(self, 'upsert')
- remote:_request('upsert', opts, self.id, key, oplist)
- return
+ return remote:_request('upsert', opts, self.id, key, oplist)
end
function methods:get(key, opts)
@@ -1133,10 +1166,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:get() doesn't support `buffer` argument")
end
- local res = remote:_request('select', opts, self.space.id, self.id,
- box.index.EQ, 0, 2, key)
- if res[2] ~= nil then box.error(box.error.MORE_THAN_ONE_TUPLE) end
- if res[1] ~= nil then return res[1] end
+ return remote:_request('get', opts, self.space.id, self.id,
+ box.index.EQ, 0, 2, key)
end
function methods:min(key, opts)
@@ -1144,9 +1175,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:min() doesn't support `buffer` argument")
end
- local res = remote:_request('select', opts, self.space.id, self.id,
- box.index.GE, 0, 1, key)
- return one_tuple(res)
+ return remote:_request('get', opts, self.space.id, self.id,
+ box.index.GE, 0, 1, key)
end
function methods:max(key, opts)
@@ -1154,9 +1184,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:max() doesn't support `buffer` argument")
end
- local res = remote:_request('select', opts, self.space.id, self.id,
- box.index.LE, 0, 1, key)
- return one_tuple(res)
+ return remote:_request('get', opts, self.space.id, self.id,
+ box.index.LE, 0, 1, key)
end
function methods:count(key, opts)
@@ -1166,21 +1195,18 @@ index_metatable = function(remote)
end
local code = string.format('box.space.%s.index.%s:count',
self.space.name, self.name)
- return remote:_request('call_16', opts, code, { key })[1][1]
+ return remote:_request('count', opts, code, { key })
end
function methods:delete(key, opts)
check_index_arg(self, 'delete')
- local res = remote:_request('delete', opts, self.space.id, self.id,
- key)
- return one_tuple(res)
+ return remote:_request('delete', opts, self.space.id, self.id, key)
end
function methods:update(key, oplist, opts)
check_index_arg(self, 'update')
- local res = remote:_request('update', opts, self.space.id, self.id,
- key, oplist)
- return one_tuple(res)
+ return remote:_request('update', opts, self.space.id, self.id, key,
+ oplist)
end
return { __index = methods, __metatable = false }
diff --git a/test/box/net.box.result b/test/box/net.box.result
index cf7b27f0b..6a3713fc0 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -416,9 +416,11 @@ cn.space.net_box_test_space:select({234}, { iterator = 'LT' })
...
cn.space.net_box_test_space:update({1}, { { '+', 2, 2 } })
---
+- null
...
cn.space.net_box_test_space:delete{1}
---
+- null
...
cn.space.net_box_test_space:delete{2}
---
@@ -426,6 +428,7 @@ cn.space.net_box_test_space:delete{2}
...
cn.space.net_box_test_space:delete{2}
---
+- null
...
-- test one-based indexing in splice operation (see update.test.lua)
cn.space.net_box_test_space:replace({10, 'abcde'})
@@ -754,12 +757,15 @@ remote_space:upsert({3}, {}, { timeout = 1e-9 })
...
remote_space:upsert({4}, {})
---
+- null
...
remote_space:upsert({5}, {}, { timeout = 1.00 })
---
+- null
...
remote_space:upsert({3}, {})
---
+- null
...
remote_space:update({3}, {}, { timeout = 1e-9 })
---
@@ -981,12 +987,15 @@ _ = remote_pk:delete({5})
...
remote_space:get(0)
---
+- null
...
remote_space:get(1)
---
+- null
...
remote_space:get(2)
---
+- null
...
remote_space = nil
---
@@ -1318,6 +1327,7 @@ c.space.test:select{}
...
c.space.test:upsert({1, 2, 'nothing'}, {{'+', 2, 1}}) -- common update
---
+- null
...
c.space.test:select{}
---
@@ -1325,6 +1335,7 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'something'}, {{'+', 2, 1}}) -- insert
---
+- null
...
c.space.test:select{}
---
@@ -1333,6 +1344,7 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'nothing'}, {{'+', 3, 100500}}) -- wrong operation
---
+- null
...
c.space.test:select{}
---
@@ -1481,6 +1493,7 @@ result
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
+- 7
...
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
---
@@ -1492,6 +1505,7 @@ result
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
+- 7
...
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
---
diff --git a/test/box/sql.result b/test/box/sql.result
index 11a698850..95f8da7dd 100644
--- a/test/box/sql.result
+++ b/test/box/sql.result
@@ -105,6 +105,7 @@ space:select{1}
-- xxx: update comes through, returns 0 rows affected
space:update(1, {{'=', 2, 'I am a new tuple'}})
---
+- null
...
-- nothing is selected, since nothing was there
space:select{1}
@@ -208,6 +209,7 @@ space:delete(0)
...
space:delete(4294967295)
---
+- null
...
box.space.test:drop()
---
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 5/8] test: fix unstable test
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (3 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-22 5:32 ` [tarantool-patches] " Kirill Yukhin
2018-05-08 15:50 ` Konstantin Osipov
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (2 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
---
test/box/errinj.result | 3 +++
test/box/errinj.test.lua | 1 +
2 files changed, 4 insertions(+)
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 958f82dd9..5b4bc23a3 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -1136,6 +1136,9 @@ errinj.set("ERRINJ_WAL_DELAY", false)
---
- ok
...
+while ok == nil do fiber.sleep(0.01) end
+---
+...
ok, err
---
- true
diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
index a3dde8458..e1460d1b6 100644
--- a/test/box/errinj.test.lua
+++ b/test/box/errinj.test.lua
@@ -385,6 +385,7 @@ end)
test_run:cmd('setopt delimiter ""');
cn.space.test:get{1}
errinj.set("ERRINJ_WAL_DELAY", false)
+while ok == nil do fiber.sleep(0.01) end
ok, err
cn:close()
s:drop()
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 6/8] netbox: introduce fiber-async API
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (4 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 12:31 ` [tarantool-patches] " Alexander Turenko
2018-04-23 16:44 ` Vladimir Davydov
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Now any netbox call blocks a caller-fiber until a result is read
from a socket, or time is out. To use it asynchronously it is
necessary to create a fiber per request. Sometimes it is
unwanted - for example if RPS is very high (for example, about
100k), and latency is about 1 second. Or when it is neccessary
to send multiple requests in paralles and then collect responses
(map-reduce).
The patch introduces a new option for all netbox requests:
is_async. With this option any called netbox method returns
immediately (but still yields for a moment) a 'future' object.
By a future object a user can check if the request is finalized,
get a result or error, wait for a timeout, discard a response.
Example of is_async usage:
future = conn:call(func, {params}, {..., is_async = true})
-- Do some work ...
if not future.is_ready() then
result, err = future:wait_result(timeout)
end
-- Or:
result, error = future:result()
A future:result() and :wait_result() returns either an error or
a response in the same format, as the sync versions of the called
methods.
Part of #3107
---
src/box/lua/net_box.lua | 159 ++++++++++++--
test/box/net.box.result | 519 +++++++++++++++++++++++++++++++++++++++++++++-
test/box/net.box.test.lua | 186 ++++++++++++++++-
3 files changed, 836 insertions(+), 28 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3868cdf1c..96f528963 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback,
local last_error
local state_cond = fiber.cond() -- signaled when the state changes
- -- requests: requests currently 'in flight', keyed by a request id;
- -- value refs are weak hence if a client dies unexpectedly,
- -- GC cleans the mess. Client submits a request and waits on state_cond.
- -- If the reponse arrives within the timeout, the worker wakes
- -- client fiber explicitly. Otherwize, wait on state_cond completes and
- -- the client reports E_TIMEOUT.
+ -- Async requests currently 'in flight', keyed by a request
+ -- id. Value refs are weak hence if a client dies
+ -- unexpectedly, GC cleans the mess. Client either submits a
+ -- request and waits on state_cond, OR makes an async request
+ -- and does not block until a response is received. If the
+ -- request is not async and the reponse arrives within the
+ -- timeout, the worker wakes client fiber explicitly.
+ -- Otherwize, wait on state_cond completes and the client
+ -- reports E_TIMEOUT.
+ -- Async request can not be timed out completely. Instead a
+ -- user must decide when he does not want to wait for
+ -- response anymore.
local requests = setmetatable({}, { __mode = 'v' })
local next_request_id = 1
@@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback,
local send_buf = buffer.ibuf(buffer.READAHEAD)
local recv_buf = buffer.ibuf(buffer.READAHEAD)
+ local function wakeup_client(client)
+ if client and client:status() ~= 'dead' then
+ client:wakeup()
+ end
+ end
+
+ --
+ -- Async request metamethods.
+ --
+ local request_index = {}
+ --
+ -- When an async request is finalized (with ok or error - no
+ -- matter), its 'id' field is nullified by a response
+ -- dispatcher.
+ --
+ function request_index:is_ready()
+ return self.id == nil or worker_fiber == nil
+ end
+ --
+ -- When a request is finished, a result can be got from a
+ -- future object anytime.
+ -- @retval result, nil Success, the response is returned.
+ -- @retval nil, error Error occured.
+ --
+ function request_index:result()
+ if self.errno then
+ return nil, box.error.new({code = self.errno,
+ reason = self.response})
+ elseif not self.id then
+ return self.response
+ elseif not worker_fiber then
+ return nil, box.error.new(E_NO_CONNECTION)
+ else
+ return nil, box.error.new(box.error.PROC_LUA,
+ 'Response is not ready')
+ end
+ end
+ --
+ -- Wait for a response or error max timeout seconds.
+ -- @param timeout Max seconds to wait.
+ -- @retval result, nil Success, the response is returned.
+ -- @retval nil, error Error occured.
+ --
+ function request_index:wait_result(timeout)
+ if timeout then
+ if type(timeout) ~= 'number' or timeout < 0 then
+ error('Usage: future:wait_result(timeout)')
+ end
+ else
+ timeout = TIMEOUT_INFINITY
+ end
+ if not self:is_ready() then
+ -- When a response is ready before timeout, the
+ -- waiting client is waked up spuriously.
+ local old_client = self.client
+ self.client = fiber.self()
+ while timeout > 0 and not self:is_ready() do
+ local ts = fiber.clock()
+ state_cond:wait(timeout)
+ timeout = timeout - (fiber.clock() - ts)
+ end
+ self.client = old_client
+ if not self:is_ready() then
+ return nil, box.error.new(E_TIMEOUT)
+ end
+ -- It is possible that multiple fibers are waiting for
+ -- a result. In such a case a first, who got it, must
+ -- wakeup the previous waiting client. This one wakes
+ -- up another. Another wakes up third one, etc.
+ wakeup_client(old_client)
+ end
+ return self:result()
+ end
+ --
+ -- Make a connection forget about the response. When it will
+ -- be received, it will be ignored.
+ --
+ function request_index:discard()
+ if self.id then
+ requests[self.id] = nil
+ self.id = nil
+ self.errno = box.error.PROC_LUA
+ self.response = 'Response is discarded'
+ end
+ end
+
+ local request_mt = { __index = request_index }
+
-- STATE SWITCHING --
local function set_state(new_state, new_errno, new_error)
state = new_state
@@ -236,6 +330,7 @@ local function create_transport(host, port, user, password, callback,
state_cond:broadcast()
if state == 'error' or state == 'error_reconnect' then
for _, request in pairs(requests) do
+ request.id = nil
request.errno = new_errno
request.response = new_error
end
@@ -315,12 +410,16 @@ local function create_transport(host, port, user, password, callback,
end
end
- -- REQUEST/RESPONSE --
- local function perform_request(timeout, buffer, method, schema_version, ...)
+ --
+ -- Send a request and do not wait for response.
+ -- @retval nil, error Error occured.
+ -- @retval not nil Future object.
+ --
+ local function perform_async_request(buffer, method, schema_version, ...)
if state ~= 'active' then
- return last_errno or E_NO_CONNECTION, last_error
+ return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
+ reason = last_error})
end
- local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
-- alert worker to notify it of the queued outgoing data;
-- if the buffer wasn't empty, assume the worker was already alerted
if send_buf:size() == 0 then
@@ -329,12 +428,27 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
- local request = table_new(0, 6) -- reserve space for 6 keys
- request.client = fiber_self()
+ local request = setmetatable(table_new(0, 7), request_mt)
request.method = method
request.schema_version = schema_version
request.buffer = buffer
+ request.id = id
requests[id] = request
+ return request
+ end
+
+ --
+ -- Send a request and wait for response.
+ --
+ local function perform_request(timeout, buffer, method, schema_version, ...)
+ local request, err =
+ perform_async_request(buffer, method, schema_version, ...)
+ if not request then
+ return last_errno or E_NO_CONNECTION, last_error
+ end
+ request.client = fiber_self()
+ local id = request.id
+ local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
repeat
local timeout = max(0, deadline - fiber_clock())
if not state_cond:wait(timeout) then
@@ -345,12 +459,6 @@ local function create_transport(host, port, user, password, callback,
return request.errno, request.response
end
- local function wakeup_client(client)
- if client:status() ~= 'dead' then
- client:wakeup()
- end
- end
-
local function dispatch_response_iproto(hdr, body_rpos, body_end)
local id = hdr[IPROTO_SYNC_KEY]
local request = requests[id]
@@ -358,6 +466,7 @@ local function create_transport(host, port, user, password, callback,
return
end
requests[id] = nil
+ request.id = nil
local status = hdr[IPROTO_STATUS_KEY]
local body, body_end_check
@@ -607,7 +716,8 @@ local function create_transport(host, port, user, password, callback,
stop = stop,
start = start,
wait_state = wait_state,
- perform_request = perform_request
+ perform_request = perform_request,
+ perform_async_request = perform_async_request,
}
end
@@ -864,8 +974,12 @@ function remote_methods:wait_connected(timeout)
end
function remote_methods:_request(method, opts, ...)
- local this_fiber = fiber_self()
local transport = self._transport
+ local buffer = opts and opts.buffer
+ if opts and opts.is_async then
+ return transport.perform_async_request(buffer, method, 0, ...)
+ end
+ local this_fiber = fiber_self()
local perform_request = transport.perform_request
local wait_state = transport.wait_state
local deadline = nil
@@ -877,7 +991,6 @@ function remote_methods:_request(method, opts, ...)
-- @deprecated since 1.7.4
deadline = self._deadlines[this_fiber]
end
- local buffer = opts and opts.buffer
local err, res
repeat
local timeout = deadline and max(0, deadline - fiber_clock())
@@ -928,7 +1041,7 @@ function remote_methods:call(func_name, args, opts)
check_call_args(args)
args = args or {}
local res = self:_request('call_17', opts, tostring(func_name), args)
- if type(res) ~= 'table' then
+ if type(res) ~= 'table' or opts and opts.is_async then
return res
end
return unpack(res)
@@ -945,7 +1058,7 @@ function remote_methods:eval(code, args, opts)
check_eval_args(args)
args = args or {}
local res = self:_request('eval', opts, code, args)
- if type(res) ~= 'table' then
+ if type(res) ~= 'table' or opts and opts.is_async then
return res
end
return unpack(res)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 6a3713fc0..aaa421ec6 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -2475,9 +2475,6 @@ box.internal.collation.drop('test')
space:drop()
---
...
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
----
-...
c.state
---
- closed
@@ -2485,3 +2482,519 @@ c.state
c = nil
---
...
+--
+-- gh-3107: fiber-async netbox.
+--
+f = nil
+---
+...
+function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end
+---
+...
+s = box.schema.create_space('test')
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+s:replace{2}
+---
+- [2]
+...
+s:replace{3}
+---
+- [3]
+...
+s:replace{4}
+---
+- [4]
+...
+c = net:connect(box.cfg.listen)
+---
+...
+--
+-- Check long connections, multiple wait_result().
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+future:result()
+---
+- null
+- Response is not ready
+...
+future:is_ready()
+---
+- false
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+ret = future:wait_result(0.01)
+---
+...
+future:is_ready()
+---
+- true
+...
+future:wait_result(0.01)
+---
+- [1, 2, 3]
+...
+ret
+---
+- [1, 2, 3]
+...
+_, err = pcall(future.wait_result, future, true)
+---
+...
+err:find('Usage') ~= nil
+---
+- true
+...
+_, err = pcall(future.wait_result, future, '100')
+---
+...
+err:find('Usage') ~= nil
+---
+- true
+...
+--
+-- Check infinity timeout.
+--
+ret = nil
+---
+...
+_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
+---
+...
+f:wakeup()
+---
+...
+while not ret do fiber.sleep(0.01) end
+---
+...
+ret
+---
+- [1, 2, 3]
+...
+future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
+---
+...
+future:result()
+---
+- null
+- Response is not ready
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+future:wait_result(0.01)
+---
+- [1, 2, 3]
+...
+--
+-- Ensure the request is garbage collected both if is not used and
+-- if is.
+--
+gc_test = setmetatable({}, {__mode = 'v'})
+---
+...
+gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+gc_test.future ~= nil
+---
+- true
+...
+collectgarbage()
+---
+- 0
+...
+gc_test
+---
+- []
+...
+f:wakeup()
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+collectgarbage()
+---
+- 0
+...
+future ~= nil
+---
+- true
+...
+f:wakeup()
+---
+...
+future:wait_result(1000)
+---
+- [1, 2, 3]
+...
+collectgarbage()
+---
+- 0
+...
+future ~= nil
+---
+- true
+...
+gc_test.future = future
+---
+...
+future = nil
+---
+...
+collectgarbage()
+---
+- 0
+...
+gc_test
+---
+- []
+...
+--
+-- Ensure a request can be finalized from non-caller fibers.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+ret = {}
+---
+...
+count = 0
+---
+...
+for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
+---
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+while count ~= 10 do fiber.sleep(0.1) end
+---
+...
+ret
+---
+- - &0 [1, 2, 3]
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+...
+--
+-- Test space methods.
+--
+future = c.space.test:select({1}, {is_async = true})
+---
+...
+ret = future:wait_result(100)
+---
+...
+ret
+---
+- - [1]
+...
+type(ret[1])
+---
+- cdata
+...
+future = c.space.test:insert({5}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5]
+...
+s:get{5}
+---
+- [5]
+...
+future = c.space.test:replace({6}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [6]
+...
+s:get{6}
+---
+- [6]
+...
+future = c.space.test:delete({6}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [6]
+...
+s:get{6}
+---
+...
+future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 5]
+...
+s:get{5}
+---
+- [5, 5]
+...
+future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- null
+...
+s:get{5}
+---
+- [5, 6]
+...
+future = c.space.test:get({5}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 6]
+...
+--
+-- Test index methods.
+--
+future = c.space.test.index.pk:select({1}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- - [1]
+...
+future = c.space.test.index.pk:get({2}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [2]
+...
+future = c.space.test.index.pk:min({}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [1]
+...
+future = c.space.test.index.pk:max({}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 6]
+...
+future = c.space.test.index.pk:count({3}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- 1
+...
+future = c.space.test.index.pk:delete({3}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [3]
+...
+s:get{3}
+---
+...
+future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [4, 6]
+...
+s:get{4}
+---
+- [4, 6]
+...
+--
+-- Test async errors.
+--
+future = c.space.test:insert({1}, {is_async = true})
+---
+...
+future:wait_result()
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+future:result()
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+--
+-- Test discard.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+future:discard()
+---
+...
+f:wakeup()
+---
+...
+future:result()
+---
+- null
+- Response is discarded
+...
+future:wait_result(100)
+---
+- null
+- Response is discarded
+...
+--
+-- Test closed connection.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+c:close()
+---
+...
+future2:wait_result(100)
+---
+- null
+- Connection is not established
+...
+future2:result()
+---
+- null
+- Connection is not established
+...
+future2:discard()
+---
+...
+-- Already successful result must be available.
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+future:result()
+---
+- [1, 2, 3]
+...
+future:is_ready()
+---
+- true
+...
+--
+-- Test reconnect.
+--
+c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+---
+...
+while not c:is_connected() do fiber.sleep(0.01) end
+---
+...
+future:wait_result(100)
+---
+- null
+- Peer closed
+...
+future:result()
+---
+- null
+- Peer closed
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+--
+-- Test raw response getting.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- 10
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- {48: [1, 2, 3]}
+...
+c:close()
+---
+...
+s:drop()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 576b5cfea..82c538fbe 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -1004,8 +1004,190 @@ c.space.test.index.sk.parts
c:close()
box.internal.collation.drop('test')
space:drop()
-
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
c.state
c = nil
+--
+-- gh-3107: fiber-async netbox.
+--
+f = nil
+function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+s:replace{1}
+s:replace{2}
+s:replace{3}
+s:replace{4}
+c = net:connect(box.cfg.listen)
+--
+-- Check long connections, multiple wait_result().
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+future:result()
+future:is_ready()
+future:wait_result(0.01)
+f:wakeup()
+ret = future:wait_result(0.01)
+future:is_ready()
+future:wait_result(0.01)
+ret
+
+_, err = pcall(future.wait_result, future, true)
+err:find('Usage') ~= nil
+_, err = pcall(future.wait_result, future, '100')
+err:find('Usage') ~= nil
+
+--
+-- Check infinity timeout.
+--
+ret = nil
+_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
+f:wakeup()
+while not ret do fiber.sleep(0.01) end
+ret
+
+future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
+future:result()
+future:wait_result(0.01)
+f:wakeup()
+future:wait_result(0.01)
+
+--
+-- Ensure the request is garbage collected both if is not used and
+-- if is.
+--
+gc_test = setmetatable({}, {__mode = 'v'})
+gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
+gc_test.future ~= nil
+collectgarbage()
+gc_test
+f:wakeup()
+
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+collectgarbage()
+future ~= nil
+f:wakeup()
+future:wait_result(1000)
+collectgarbage()
+future ~= nil
+gc_test.future = future
+future = nil
+collectgarbage()
+gc_test
+
+--
+-- Ensure a request can be finalized from non-caller fibers.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+ret = {}
+count = 0
+for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
+future:wait_result(0.01)
+f:wakeup()
+while count ~= 10 do fiber.sleep(0.1) end
+ret
+
+--
+-- Test space methods.
+--
+future = c.space.test:select({1}, {is_async = true})
+ret = future:wait_result(100)
+ret
+type(ret[1])
+future = c.space.test:insert({5}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:replace({6}, {is_async = true})
+future:wait_result(100)
+s:get{6}
+future = c.space.test:delete({6}, {is_async = true})
+future:wait_result(100)
+s:get{6}
+future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:get({5}, {is_async = true})
+future:wait_result(100)
+
+--
+-- Test index methods.
+--
+future = c.space.test.index.pk:select({1}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:get({2}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:min({}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:max({}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:count({3}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:delete({3}, {is_async = true})
+future:wait_result(100)
+s:get{3}
+future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
+future:wait_result(100)
+s:get{4}
+
+--
+-- Test async errors.
+--
+future = c.space.test:insert({1}, {is_async = true})
+future:wait_result()
+future:result()
+
+--
+-- Test discard.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+future:discard()
+f:wakeup()
+future:result()
+future:wait_result(100)
+
+--
+-- Test closed connection.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+f:wakeup()
+future:wait_result(100)
+future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
+c:close()
+future2:wait_result(100)
+future2:result()
+future2:discard()
+-- Already successful result must be available.
+future:wait_result(100)
+future:result()
+future:is_ready()
+
+--
+-- Test reconnect.
+--
+c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+while not c:is_connected() do fiber.sleep(0.01) end
+future:wait_result(100)
+future:result()
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+f:wakeup()
+future:wait_result(100)
+
+--
+-- Test raw response getting.
+--
+ibuf = require('buffer').ibuf()
+future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
+f:wakeup()
+future:wait_result(100)
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
+c:close()
+s:drop()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 7/8] netbox: remove schema_version from requests
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (5 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-05-08 16:06 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
7 siblings, 1 reply; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Schema_version was used in netbox to update local box-like
schema. The box-like schema makes able to access spaces and
indexes via connection object.
It was updated each time, when a response from a server is
received with a schema version non-equal to the local value.
But there was no reason why a schema version is needed in a
request. It leads to ER_WRONG_SCHEMA_VERSION error sometimes,
but netbox on this error just resends the same request again. The
same behaviour can be reached with just no sending any schema
version to a server.
Remove schema_version from request, and just track schema version
changes in responses.
Part of #3351
Part of #3333
Follow up #3107
---
src/box/lua/net_box.c | 105 +++++++++++++++++++++++-----------------------
src/box/lua/net_box.lua | 90 ++++++++++++++++-----------------------
test/box/net.box.result | 6 +--
test/box/net.box.test.lua | 6 +--
4 files changed, 95 insertions(+), 112 deletions(-)
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index db2d2dbb4..04fe70b03 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -53,7 +53,6 @@ netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type)
{
struct ibuf *ibuf = (struct ibuf *) lua_topointer(L, 1);
uint64_t sync = luaL_touint64(L, 2);
- uint64_t schema_version = luaL_touint64(L, 3);
mpstream_init(stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
luamp_error, L);
@@ -67,14 +66,11 @@ netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type)
mpstream_advance(stream, fixheader_size);
/* encode header */
- luamp_encode_map(cfg, stream, 3);
+ luamp_encode_map(cfg, stream, 2);
luamp_encode_uint(cfg, stream, IPROTO_SYNC);
luamp_encode_uint(cfg, stream, sync);
- luamp_encode_uint(cfg, stream, IPROTO_SCHEMA_VERSION);
- luamp_encode_uint(cfg, stream, schema_version);
-
luamp_encode_uint(cfg, stream, IPROTO_REQUEST_TYPE);
luamp_encode_uint(cfg, stream, r_type);
@@ -111,9 +107,8 @@ netbox_encode_request(struct mpstream *stream, size_t initial_size)
static int
netbox_encode_ping(lua_State *L)
{
- if (lua_gettop(L) < 3)
- return luaL_error(L, "Usage: netbox.encode_ping(ibuf, sync, "
- "schema_version)");
+ if (lua_gettop(L) < 2)
+ return luaL_error(L, "Usage: netbox.encode_ping(ibuf, sync)");
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_PING);
@@ -124,19 +119,20 @@ netbox_encode_ping(lua_State *L)
static int
netbox_encode_auth(lua_State *L)
{
- if (lua_gettop(L) < 6)
+ if (lua_gettop(L) < 5) {
return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, "
- "schema_version, user, password, greeting)");
+ "user, password, greeting)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_AUTH);
size_t user_len;
- const char *user = lua_tolstring(L, 4, &user_len);
+ const char *user = lua_tolstring(L, 3, &user_len);
size_t password_len;
- const char *password = lua_tolstring(L, 5, &password_len);
+ const char *password = lua_tolstring(L, 4, &password_len);
size_t salt_len;
- const char *salt = lua_tolstring(L, 6, &salt_len);
+ const char *salt = lua_tolstring(L, 5, &salt_len);
if (salt_len < SCRAMBLE_SIZE)
return luaL_error(L, "Invalid salt");
@@ -160,9 +156,10 @@ netbox_encode_auth(lua_State *L)
static int
netbox_encode_call_impl(lua_State *L, enum iproto_type type)
{
- if (lua_gettop(L) < 5)
+ if (lua_gettop(L) < 4) {
return luaL_error(L, "Usage: netbox.encode_call(ibuf, sync, "
- "schema_version, function_name, args)");
+ "function_name, args)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, type);
@@ -171,13 +168,13 @@ netbox_encode_call_impl(lua_State *L, enum iproto_type type)
/* encode proc name */
size_t name_len;
- const char *name = lua_tolstring(L, 4, &name_len);
+ const char *name = lua_tolstring(L, 3, &name_len);
luamp_encode_uint(cfg, &stream, IPROTO_FUNCTION_NAME);
luamp_encode_str(cfg, &stream, name, name_len);
/* encode args */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 5);
+ luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
return 0;
@@ -198,9 +195,10 @@ netbox_encode_call(lua_State *L)
static int
netbox_encode_eval(lua_State *L)
{
- if (lua_gettop(L) < 5)
+ if (lua_gettop(L) < 4) {
return luaL_error(L, "Usage: netbox.encode_eval(ibuf, sync, "
- "schema_version, expr, args)");
+ "expr, args)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_EVAL);
@@ -209,13 +207,13 @@ netbox_encode_eval(lua_State *L)
/* encode expr */
size_t expr_len;
- const char *expr = lua_tolstring(L, 4, &expr_len);
+ const char *expr = lua_tolstring(L, 3, &expr_len);
luamp_encode_uint(cfg, &stream, IPROTO_EXPR);
luamp_encode_str(cfg, &stream, expr, expr_len);
/* encode args */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 5);
+ luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
return 0;
@@ -224,21 +222,22 @@ netbox_encode_eval(lua_State *L)
static int
netbox_encode_select(lua_State *L)
{
- if (lua_gettop(L) < 9)
+ if (lua_gettop(L) < 8) {
return luaL_error(L, "Usage netbox.encode_select(ibuf, sync, "
- "schema_version, space_id, index_id, iterator, "
- "offset, limit, key)");
+ "space_id, index_id, iterator, offset, "
+ "limit, key)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_SELECT);
luamp_encode_map(cfg, &stream, 6);
- uint32_t space_id = lua_tonumber(L, 4);
- uint32_t index_id = lua_tonumber(L, 5);
- int iterator = lua_tointeger(L, 6);
- uint32_t offset = lua_tonumber(L, 7);
- uint32_t limit = lua_tonumber(L, 8);
+ uint32_t space_id = lua_tonumber(L, 3);
+ uint32_t index_id = lua_tonumber(L, 4);
+ int iterator = lua_tointeger(L, 5);
+ uint32_t offset = lua_tonumber(L, 6);
+ uint32_t limit = lua_tonumber(L, 7);
/* encode space_id */
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
@@ -262,7 +261,7 @@ netbox_encode_select(lua_State *L)
/* encode key */
luamp_encode_uint(cfg, &stream, IPROTO_KEY);
- luamp_convert_key(L, cfg, &stream, 9);
+ luamp_convert_key(L, cfg, &stream, 8);
netbox_encode_request(&stream, svp);
return 0;
@@ -271,24 +270,23 @@ netbox_encode_select(lua_State *L)
static inline int
netbox_encode_insert_or_replace(lua_State *L, uint32_t reqtype)
{
- if (lua_gettop(L) < 5)
+ if (lua_gettop(L) < 4) {
return luaL_error(L, "Usage: netbox.encode_insert(ibuf, sync, "
- "schema_version, space_id, tuple)");
- lua_settop(L, 5);
-
+ "space_id, tuple)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, reqtype);
luamp_encode_map(cfg, &stream, 2);
/* encode space_id */
- uint32_t space_id = lua_tonumber(L, 4);
+ uint32_t space_id = lua_tonumber(L, 3);
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
luamp_encode_uint(cfg, &stream, space_id);
/* encode args */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 5);
+ luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
return 0;
@@ -309,9 +307,10 @@ netbox_encode_replace(lua_State *L)
static int
netbox_encode_delete(lua_State *L)
{
- if (lua_gettop(L) < 6)
+ if (lua_gettop(L) < 5) {
return luaL_error(L, "Usage: netbox.encode_delete(ibuf, sync, "
- "schema_version, space_id, index_id, key)");
+ "space_id, index_id, key)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_DELETE);
@@ -319,18 +318,18 @@ netbox_encode_delete(lua_State *L)
luamp_encode_map(cfg, &stream, 3);
/* encode space_id */
- uint32_t space_id = lua_tonumber(L, 4);
+ uint32_t space_id = lua_tonumber(L, 3);
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
luamp_encode_uint(cfg, &stream, space_id);
/* encode space_id */
- uint32_t index_id = lua_tonumber(L, 5);
+ uint32_t index_id = lua_tonumber(L, 4);
luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
luamp_encode_uint(cfg, &stream, index_id);
/* encode key */
luamp_encode_uint(cfg, &stream, IPROTO_KEY);
- luamp_convert_key(L, cfg, &stream, 6);
+ luamp_convert_key(L, cfg, &stream, 5);
netbox_encode_request(&stream, svp);
return 0;
@@ -339,9 +338,10 @@ netbox_encode_delete(lua_State *L)
static int
netbox_encode_update(lua_State *L)
{
- if (lua_gettop(L) < 7)
+ if (lua_gettop(L) < 6) {
return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, "
- "schema_version, space_id, index_id, key, ops)");
+ "space_id, index_id, key, ops)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPDATE);
@@ -349,12 +349,12 @@ netbox_encode_update(lua_State *L)
luamp_encode_map(cfg, &stream, 5);
/* encode space_id */
- uint32_t space_id = lua_tonumber(L, 4);
+ uint32_t space_id = lua_tonumber(L, 3);
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
luamp_encode_uint(cfg, &stream, space_id);
/* encode index_id */
- uint32_t index_id = lua_tonumber(L, 5);
+ uint32_t index_id = lua_tonumber(L, 4);
luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
luamp_encode_uint(cfg, &stream, index_id);
@@ -365,12 +365,12 @@ netbox_encode_update(lua_State *L)
/* encode in reverse order for speedup - see luamp_encode() code */
/* encode ops */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 7);
+ luamp_encode_tuple(L, cfg, &stream, 6);
lua_pop(L, 1); /* ops */
/* encode key */
luamp_encode_uint(cfg, &stream, IPROTO_KEY);
- luamp_convert_key(L, cfg, &stream, 6);
+ luamp_convert_key(L, cfg, &stream, 5);
netbox_encode_request(&stream, svp);
return 0;
@@ -379,9 +379,10 @@ netbox_encode_update(lua_State *L)
static int
netbox_encode_upsert(lua_State *L)
{
- if (lua_gettop(L) != 6)
+ if (lua_gettop(L) != 5) {
return luaL_error(L, "Usage: netbox.encode_upsert(ibuf, sync, "
- "schema_version, space_id, tuple, ops)");
+ "space_id, tuple, ops)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPSERT);
@@ -389,7 +390,7 @@ netbox_encode_upsert(lua_State *L)
luamp_encode_map(cfg, &stream, 4);
/* encode space_id */
- uint32_t space_id = lua_tonumber(L, 4);
+ uint32_t space_id = lua_tonumber(L, 3);
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
luamp_encode_uint(cfg, &stream, space_id);
@@ -400,12 +401,12 @@ netbox_encode_upsert(lua_State *L)
/* encode in reverse order for speedup - see luamp_encode() code */
/* encode ops */
luamp_encode_uint(cfg, &stream, IPROTO_OPS);
- luamp_encode_tuple(L, cfg, &stream, 6);
+ luamp_encode_tuple(L, cfg, &stream, 5);
lua_pop(L, 1); /* ops */
/* encode tuple */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 5);
+ luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
return 0;
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 96f528963..a2b7b39d2 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -45,7 +45,6 @@ local IPROTO_GREETING_SIZE = 128
local E_UNKNOWN = box.error.UNKNOWN
local E_NO_CONNECTION = box.error.NO_CONNECTION
local E_TIMEOUT = box.error.TIMEOUT
-local E_WRONG_SCHEMA_VERSION = box.error.WRONG_SCHEMA_VERSION
local E_PROC_LUA = box.error.PROC_LUA
-- utility tables
@@ -93,7 +92,7 @@ local method_encoder = {
max = internal.encode_select,
count = internal.encode_call,
-- inject raw data into connection, used by console and tests
- inject = function(buf, id, schema_version, bytes)
+ inject = function(buf, id, bytes)
local ptr = buf:reserve(#bytes)
ffi.copy(ptr, bytes, #bytes)
buf.wpos = ptr + #bytes
@@ -158,7 +157,7 @@ end
-- * implements protocols; concurrent perform_request()-s benefit from
-- multiplexing support in the protocol;
-- * schema-aware (optional) - snoops responses and initiates
--- schema reload when a request fails due to schema version mismatch;
+-- schema reload when a response has a new schema version;
-- * delivers transport events via the callback.
--
-- Transport state machine:
@@ -415,7 +414,7 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Future object.
--
- local function perform_async_request(buffer, method, schema_version, ...)
+ local function perform_async_request(buffer, method, ...)
if state ~= 'active' then
return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
reason = last_error})
@@ -426,11 +425,10 @@ local function create_transport(host, port, user, password, callback,
worker_fiber:wakeup()
end
local id = next_request_id
- method_encoder[method](send_buf, id, schema_version, ...)
+ method_encoder[method](send_buf, id, ...)
next_request_id = next_id(id)
- local request = setmetatable(table_new(0, 7), request_mt)
+ local request = setmetatable(table_new(0, 6), request_mt)
request.method = method
- request.schema_version = schema_version
request.buffer = buffer
request.id = id
requests[id] = request
@@ -440,9 +438,9 @@ local function create_transport(host, port, user, password, callback,
--
-- Send a request and wait for response.
--
- local function perform_request(timeout, buffer, method, schema_version, ...)
+ local function perform_request(timeout, buffer, method, ...)
local request, err =
- perform_async_request(buffer, method, schema_version, ...)
+ perform_async_request(buffer, method, ...)
if not request then
return last_errno or E_NO_CONNECTION, last_error
end
@@ -578,7 +576,7 @@ local function create_transport(host, port, user, password, callback,
log.warn("Netbox text protocol support is deprecated since 1.10, "..
"please use require('console').connect() instead")
local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
- method_encoder.inject(send_buf, nil, nil, setup_delimiter)
+ method_encoder.inject(send_buf, nil, setup_delimiter)
local err, response = send_and_recv_console()
if err then
return error_sm(err, response)
@@ -619,7 +617,7 @@ local function create_transport(host, port, user, password, callback,
set_state('fetch_schema')
return iproto_schema_sm()
end
- encode_auth(send_buf, new_request_id(), nil, user, password, salt)
+ encode_auth(send_buf, new_request_id(), user, password, salt)
local err, hdr, body_rpos, body_end = send_and_recv_iproto()
if err then
return error_sm(err, hdr)
@@ -642,11 +640,9 @@ local function create_transport(host, port, user, password, callback,
local select2_id = new_request_id()
local response = {}
-- fetch everything from space _vspace, 2 = ITER_ALL
- encode_select(send_buf, select1_id, nil, VSPACE_ID, 0, 2, 0,
- 0xFFFFFFFF, nil)
+ encode_select(send_buf, select1_id, VSPACE_ID, 0, 2, 0, 0xFFFFFFFF, nil)
-- fetch everything from space _vindex, 2 = ITER_ALL
- encode_select(send_buf, select2_id, nil, VINDEX_ID, 0, 2, 0,
- 0xFFFFFFFF, nil)
+ encode_select(send_buf, select2_id, VINDEX_ID, 0, 2, 0, 0xFFFFFFFF, nil)
schema_version = nil -- any schema_version will do provided that
-- it is consistent across responses
repeat
@@ -692,8 +688,7 @@ local function create_transport(host, port, user, password, callback,
-- Sic: self.schema_version will be updated only after reload.
local body
body, body_end = decode(body_rpos)
- set_state('fetch_schema',
- E_WRONG_SCHEMA_VERSION, body[IPROTO_ERROR_KEY])
+ set_state('fetch_schema')
return iproto_schema_sm(schema_version)
end
return iproto_sm(schema_version)
@@ -977,57 +972,44 @@ function remote_methods:_request(method, opts, ...)
local transport = self._transport
local buffer = opts and opts.buffer
if opts and opts.is_async then
- return transport.perform_async_request(buffer, method, 0, ...)
+ return transport.perform_async_request(buffer, method, ...)
end
- local this_fiber = fiber_self()
- local perform_request = transport.perform_request
- local wait_state = transport.wait_state
- local deadline = nil
+ local deadline
if opts and opts.timeout then
-- conn.space:request(, { timeout = timeout })
deadline = fiber_clock() + opts.timeout
else
-- conn:timeout(timeout).space:request()
-- @deprecated since 1.7.4
- deadline = self._deadlines[this_fiber]
+ deadline = self._deadlines[fiber_self()]
end
- local err, res
- repeat
- local timeout = deadline and max(0, deadline - fiber_clock())
- if self.state ~= 'active' then
- wait_state('active', timeout)
- timeout = deadline and max(0, deadline - fiber_clock())
- end
- err, res = perform_request(timeout, buffer, method,
- self.schema_version, ...)
- if not err then
- return res
- elseif err == E_WRONG_SCHEMA_VERSION then
- err = nil
- end
- until err
- box.error({code = err, reason = res})
+ local timeout = deadline and max(0, deadline - fiber_clock())
+ if self.state ~= 'active' then
+ transport.wait_state('active', timeout)
+ timeout = deadline and max(0, deadline - fiber_clock())
+ end
+ local err, res = transport.perform_request(timeout, buffer, method, ...)
+ if err then
+ box.error({code = err, reason = res})
+ end
+ -- Try to wait until a schema is reloaded if needed.
+ -- Regardless of reloading result, the main response is
+ -- returned, since it does not depend on any schema things.
+ if self.state == 'fetch_schema' then
+ timeout = deadline and max(0, deadline - fiber_clock())
+ transport.wait_state('active', timeout)
+ end
+ return res
end
function remote_methods:ping(opts)
check_remote_arg(self, 'ping')
- local timeout = opts and opts.timeout
- if timeout == nil then
- -- conn:timeout(timeout):ping()
- -- @deprecated since 1.7.4
- local deadline = self._deadlines[fiber_self()]
- timeout = deadline and max(0, deadline - fiber_clock())
- or (opts and opts.timeout)
- end
- local err = self._transport.perform_request(timeout, nil, 'ping',
- self.schema_version)
- return not err or err == E_WRONG_SCHEMA_VERSION
+ return (pcall(self._request, self, 'ping', opts))
end
function remote_methods:reload_schema()
check_remote_arg(self, 'reload_schema')
- self:_request('select', nil, VSPACE_ID, 0, box.index.GE, 0, 0xFFFFFFFF,
- nil)
+ self:ping()
end
-- @deprecated since 1.7.4
@@ -1200,10 +1182,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- err, res = pr(timeout, nil, 'eval', nil, loader, {line})
+ err, res = pr(timeout, nil, 'eval', loader, {line})
else
assert(self.protocol == 'Lua console')
- err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n')
+ err, res = pr(timeout, nil, 'inject', line..'$EOF$\n')
end
if err then
box.error({code = err, reason = res})
diff --git a/test/box/net.box.result b/test/box/net.box.result
index aaa421ec6..3c494696b 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
@@ -2377,7 +2377,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
---
...
c.state
@@ -2940,7 +2940,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 82c538fbe..9a826dc6d 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
@@ -965,7 +965,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
c.state
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
@@ -1169,7 +1169,7 @@ future:is_ready()
--
c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
future:wait_result(100)
future:result()
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 8/8] netbox: implement perform_request via async version
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (6 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:47 ` Vladimir Davydov
7 siblings, 1 reply; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
When async netbox was introduced, it is not needed to hold a
special sync implementation - it can be just async call + waiting
for a response.
Closes #3107
---
src/box/lua/net_box.lua | 26 +++++++++-----------------
1 file changed, 9 insertions(+), 17 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index a2b7b39d2..6b33f70c7 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -437,24 +437,16 @@ local function create_transport(host, port, user, password, callback,
--
-- Send a request and wait for response.
+ -- @retval nil, error Error occured.
+ -- @retval not nil Response object.
--
local function perform_request(timeout, buffer, method, ...)
local request, err =
perform_async_request(buffer, method, ...)
if not request then
- return last_errno or E_NO_CONNECTION, last_error
+ return nil, err
end
- request.client = fiber_self()
- local id = request.id
- local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
- repeat
- local timeout = max(0, deadline - fiber_clock())
- if not state_cond:wait(timeout) then
- requests[id] = nil
- return E_TIMEOUT, 'Timeout exceeded'
- end
- until requests[id] == nil -- i.e. completed (beware spurious wakeups)
- return request.errno, request.response
+ return request:wait_result(timeout)
end
local function dispatch_response_iproto(hdr, body_rpos, body_end)
@@ -988,9 +980,9 @@ function remote_methods:_request(method, opts, ...)
transport.wait_state('active', timeout)
timeout = deadline and max(0, deadline - fiber_clock())
end
- local err, res = transport.perform_request(timeout, buffer, method, ...)
+ local res, err = transport.perform_request(timeout, buffer, method, ...)
if err then
- box.error({code = err, reason = res})
+ box.error.raise(err)
end
-- Try to wait until a schema is reloaded if needed.
-- Regardless of reloading result, the main response is
@@ -1182,13 +1174,13 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- err, res = pr(timeout, nil, 'eval', loader, {line})
+ res, err = pr(timeout, nil, 'eval', loader, {line})
else
assert(self.protocol == 'Lua console')
- err, res = pr(timeout, nil, 'inject', line..'$EOF$\n')
+ res, err = pr(timeout, nil, 'inject', line..'$EOF$\n')
end
if err then
- box.error({code = err, reason = res})
+ box.error.raise(err)
end
return res[1] or res
end
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 5/8] test: fix unstable test
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
@ 2018-04-22 5:32 ` Kirill Yukhin
2018-05-08 15:50 ` Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Kirill Yukhin @ 2018-04-22 5:32 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Hello Vlad,
On 16 апр 21:39, Vladislav Shpilevoy wrote:
> ---
> test/box/errinj.result | 3 +++
> test/box/errinj.test.lua | 1 +
> 2 files changed, 4 insertions(+)
I am so tired of this flaky that I've decided to commit this patch
out-of-cycle. Committed to 1.10 and 2.1 branches.
Could you pls evict it from your patch-set?
--
Rregards, Kirill Yukhin
^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
@ 2018-04-23 12:31 ` Alexander Turenko
2018-04-23 18:59 ` Vladislav Shpilevoy
2018-04-23 16:44 ` Vladimir Davydov
1 sibling, 1 reply; 32+ messages in thread
From: Alexander Turenko @ 2018-04-23 12:31 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
Hi Vlad!
Minor wording comments are below (Vlad asks to track it here).
WBR, Alexander Turenko.
On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
> Now any netbox call blocks a caller-fiber until a result is read
> from a socket, or time is out. To use it asynchronously it is
> necessary to create a fiber per request. Sometimes it is
> unwanted - for example if RPS is very high (for example, about
> 100k), and latency is about 1 second. Or when it is neccessary
> to send multiple requests in paralles and then collect responses
> (map-reduce).
Paralles -> parallel.
> + function request_index:wait_result(timeout)
> ...
> + if not self:is_ready() then
> + -- When a response is ready before timeout, the
> + -- waiting client is waked up spuriously.
Do you mean prematurely?
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 1/8] lua: fix box.error.raise
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
@ 2018-04-23 16:19 ` Vladimir Davydov
2018-05-08 15:36 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:19 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:11PM +0300, Vladislav Shpilevoy wrote:
> It did not work because raise is implemented as __index metatable
> member, and error() is __call metatable member. The second one
> takes additional implicit argument - self.
> ---
> src/box/lua/error.cc | 78 +++++++++++++++++++++++++++++++++-----------------
> test/box/misc.result | 39 +++++++++++++++++++++++++
> test/box/misc.test.lua | 18 ++++++++++++
> 3 files changed, 108 insertions(+), 27 deletions(-)
Ack
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 2/8] lua: allow to create and error object with no throw
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
@ 2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:20 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:12PM +0300, Vladislav Shpilevoy wrote:
> It is needed to return error via 'nil, error_object' notation,
> and to store an error object to return it later.
>
> Closes #3031
> ---
> src/box/lua/error.cc | 15 +++++++++++++++
> test/box/misc.result | 25 +++++++++++++++++++++++++
> test/box/misc.test.lua | 10 ++++++++++
> 3 files changed, 50 insertions(+)
Ack
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 3/8] console: fix a bug in interactive readline usage
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
@ 2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:20 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:13PM +0300, Vladislav Shpilevoy wrote:
> Spurious wakeups are possible in console, that makes readline
> think that there are some data on stdin. Waked up readline
> returns garbage instead of string, that crashes a server on
> assertion in Lua.
>
> Closes #3343
> ---
> src/box/lua/console.c | 5 +++--
> 1 file changed, 3 insertions(+), 2 deletions(-)
Ack
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
@ 2018-04-23 16:42 ` Vladimir Davydov
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
2018-05-08 15:49 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 1 reply; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:42 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:14PM +0300, Vladislav Shpilevoy wrote:
> Netbox has a table 'method_codec' that is used to encode a
> request by a method name. But a response is decoded out of codec.
> It leads to
> 1) decoding into Lua tables before decoding into tuples where
> needed - it is double decoding and produces a lot of garbage;
> 2) each method contains hacks like one_tuple(), or single tuple
> check.
>
> These things can not be fixed with no real codec instead of
> encoder only.
>
> Also global table with decoders is needed for #3107, where
> a request could be sent async with no fiber blocking. An async
> response when received already does not have a call context - it
> has only method name.
>
> Needed for #3107
> ---
> src/box/lua/net_box.lua | 116 +++++++++++++++++++++++++++++-------------------
> test/box/net.box.result | 14 ++++++
> test/box/sql.result | 2 +
> 3 files changed, 87 insertions(+), 45 deletions(-)
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 4ed2b375d..3868cdf1c 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
>
> -- utility tables
> local is_final_state = {closed = 1, error = 1}
> -local method_codec = {
> +
> +local function decode_nil(...) end
> +local function decode_nothing(...) return ... end
decode_nop, may be?
> +local function decode_one_tuple(response)
> + if response[1] then
> + return box.tuple.new(response[1])
> + end
> +end
> +local function decode_single_tuple(response)
> + if response[2] then
> + return nil, box.error.MORE_THAN_ONE_TUPLE
> + end
> + if response[1] then
> + return box.tuple.new(response[1])
> + end
> +end
Do we really need this MORE_THAN_ONE_TUPLE error? Can't we use
decode_one_tuple in 'get' as well? I ask, because having two methods
called decode_one_tuple and decode_single_tuple is confusing IMO -
it's unclear what's the difference between them.
> +local function decode_select(response)
> + setmetatable(response, sequence_mt)
> + for i, v in pairs(response) do
> + response[i] = box.tuple.new(v)
> + end
> + return response
> +end
> +local function decode_count(response)
> + return response[1]
> +end
> +
> +local method_encoder = {
> ping = internal.encode_ping,
> call_16 = internal.encode_call_16,
> call_17 = internal.encode_call,
> @@ -61,6 +88,10 @@ local method_codec = {
> update = internal.encode_update,
> upsert = internal.encode_upsert,
> select = internal.encode_select,
> + get = internal.encode_select,
> + min = internal.encode_select,
> + max = internal.encode_select,
> + count = internal.encode_call,
> -- inject raw data into connection, used by console and tests
> inject = function(buf, id, schema_version, bytes)
> local ptr = buf:reserve(#bytes)
> @@ -69,6 +100,24 @@ local method_codec = {
> end
> }
>
> +local method_decoder = {
> + ping = decode_nil,
> + call_16 = decode_select,
> + call_17 = decode_nothing,
> + eval = decode_nothing,
> + insert = decode_one_tuple,
> + replace = decode_one_tuple,
> + delete = decode_one_tuple,
> + update = decode_one_tuple,
> + upsert = decode_nil,
> + select = decode_select,
> + get = decode_single_tuple,
> + min = decode_single_tuple,
> + max = decode_single_tuple,
> + count = decode_count,
> + inject = decode_nothing,
> +}
> +
> local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
>
> --
> @@ -1144,9 +1175,8 @@ index_metatable = function(remote)
> if opts and opts.buffer then
> error("index:min() doesn't support `buffer` argument")
> end
> - local res = remote:_request('select', opts, self.space.id, self.id,
> - box.index.GE, 0, 1, key)
> - return one_tuple(res)
> + return remote:_request('get', opts, self.space.id, self.id,
Should be 'min'?
> + box.index.GE, 0, 1, key)
> end
>
> function methods:max(key, opts)
> @@ -1154,9 +1184,8 @@ index_metatable = function(remote)
> if opts and opts.buffer then
> error("index:max() doesn't support `buffer` argument")
> end
> - local res = remote:_request('select', opts, self.space.id, self.id,
> - box.index.LE, 0, 1, key)
> - return one_tuple(res)
> + return remote:_request('get', opts, self.space.id, self.id,
Should be 'max'?
> + box.index.LE, 0, 1, key)
> end
>
> function methods:count(key, opts)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-23 12:31 ` [tarantool-patches] " Alexander Turenko
@ 2018-04-23 16:44 ` Vladimir Davydov
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
1 sibling, 1 reply; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:44 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
> Now any netbox call blocks a caller-fiber until a result is read
> from a socket, or time is out. To use it asynchronously it is
> necessary to create a fiber per request. Sometimes it is
> unwanted - for example if RPS is very high (for example, about
> 100k), and latency is about 1 second. Or when it is neccessary
> to send multiple requests in paralles and then collect responses
> (map-reduce).
>
> The patch introduces a new option for all netbox requests:
> is_async. With this option any called netbox method returns
> immediately (but still yields for a moment) a 'future' object.
>
> By a future object a user can check if the request is finalized,
> get a result or error, wait for a timeout, discard a response.
>
> Example of is_async usage:
> future = conn:call(func, {params}, {..., is_async = true})
> -- Do some work ...
> if not future.is_ready() then
> result, err = future:wait_result(timeout)
> end
> -- Or:
> result, error = future:result()
>
> A future:result() and :wait_result() returns either an error or
> a response in the same format, as the sync versions of the called
> methods.
>
> Part of #3107
> ---
> src/box/lua/net_box.lua | 159 ++++++++++++--
> test/box/net.box.result | 519 +++++++++++++++++++++++++++++++++++++++++++++-
> test/box/net.box.test.lua | 186 ++++++++++++++++-
> 3 files changed, 836 insertions(+), 28 deletions(-)
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 3868cdf1c..96f528963 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback,
> local last_error
> local state_cond = fiber.cond() -- signaled when the state changes
>
> - -- requests: requests currently 'in flight', keyed by a request id;
> - -- value refs are weak hence if a client dies unexpectedly,
> - -- GC cleans the mess. Client submits a request and waits on state_cond.
> - -- If the reponse arrives within the timeout, the worker wakes
> - -- client fiber explicitly. Otherwize, wait on state_cond completes and
> - -- the client reports E_TIMEOUT.
> + -- Async requests currently 'in flight', keyed by a request
> + -- id. Value refs are weak hence if a client dies
> + -- unexpectedly, GC cleans the mess. Client either submits a
> + -- request and waits on state_cond, OR makes an async request
> + -- and does not block until a response is received. If the
> + -- request is not async and the reponse arrives within the
> + -- timeout, the worker wakes client fiber explicitly.
> + -- Otherwize, wait on state_cond completes and the client
> + -- reports E_TIMEOUT.
> + -- Async request can not be timed out completely. Instead a
> + -- user must decide when he does not want to wait for
> + -- response anymore.
> local requests = setmetatable({}, { __mode = 'v' })
> local next_request_id = 1
>
> @@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback,
> local send_buf = buffer.ibuf(buffer.READAHEAD)
> local recv_buf = buffer.ibuf(buffer.READAHEAD)
>
> + local function wakeup_client(client)
> + if client and client:status() ~= 'dead' then
> + client:wakeup()
> + end
> + end
> +
> + --
> + -- Async request metamethods.
> + --
> + local request_index = {}
> + --
> + -- When an async request is finalized (with ok or error - no
> + -- matter), its 'id' field is nullified by a response
> + -- dispatcher.
> + --
> + function request_index:is_ready()
> + return self.id == nil or worker_fiber == nil
> + end
> + --
> + -- When a request is finished, a result can be got from a
> + -- future object anytime.
> + -- @retval result, nil Success, the response is returned.
> + -- @retval nil, error Error occured.
> + --
> + function request_index:result()
> + if self.errno then
> + return nil, box.error.new({code = self.errno,
> + reason = self.response})
> + elseif not self.id then
> + return self.response
> + elseif not worker_fiber then
> + return nil, box.error.new(E_NO_CONNECTION)
> + else
> + return nil, box.error.new(box.error.PROC_LUA,
> + 'Response is not ready')
> + end
> + end
> + --
> + -- Wait for a response or error max timeout seconds.
> + -- @param timeout Max seconds to wait.
> + -- @retval result, nil Success, the response is returned.
> + -- @retval nil, error Error occured.
> + --
> + function request_index:wait_result(timeout)
> + if timeout then
> + if type(timeout) ~= 'number' or timeout < 0 then
> + error('Usage: future:wait_result(timeout)')
> + end
> + else
> + timeout = TIMEOUT_INFINITY
> + end
> + if not self:is_ready() then
> + -- When a response is ready before timeout, the
> + -- waiting client is waked up spuriously.
> + local old_client = self.client
> + self.client = fiber.self()
> + while timeout > 0 and not self:is_ready() do
> + local ts = fiber.clock()
> + state_cond:wait(timeout)
> + timeout = timeout - (fiber.clock() - ts)
> + end
> + self.client = old_client
> + if not self:is_ready() then
> + return nil, box.error.new(E_TIMEOUT)
> + end
> + -- It is possible that multiple fibers are waiting for
> + -- a result. In such a case a first, who got it, must
> + -- wakeup the previous waiting client. This one wakes
> + -- up another. Another wakes up third one, etc.
> + wakeup_client(old_client)
This is rather difficult for understanding IMO. Can we use a fiber.cond
instead?
> + end
> + return self:result()
> + end
> + --
> + -- Make a connection forget about the response. When it will
> + -- be received, it will be ignored.
> + --
> + function request_index:discard()
> + if self.id then
> + requests[self.id] = nil
> + self.id = nil
> + self.errno = box.error.PROC_LUA
> + self.response = 'Response is discarded'
> + end
> + end
> +
> + local request_mt = { __index = request_index }
> +
> -- STATE SWITCHING --
> local function set_state(new_state, new_errno, new_error)
> state = new_state
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 8/8] netbox: implement perform_request via async version
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
@ 2018-04-23 16:47 ` Vladimir Davydov
2018-04-23 19:00 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:47 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:18PM +0300, Vladislav Shpilevoy wrote:
> When async netbox was introduced, it is not needed to hold a
> special sync implementation - it can be just async call + waiting
> for a response.
>
> Closes #3107
> ---
> src/box/lua/net_box.lua | 26 +++++++++-----------------
> 1 file changed, 9 insertions(+), 17 deletions(-)
I'd squash this one with patch 6.
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index a2b7b39d2..6b33f70c7 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -437,24 +437,16 @@ local function create_transport(host, port, user, password, callback,
>
> --
> -- Send a request and wait for response.
> + -- @retval nil, error Error occured.
> + -- @retval not nil Response object.
> --
> local function perform_request(timeout, buffer, method, ...)
> local request, err =
> perform_async_request(buffer, method, ...)
> if not request then
> - return last_errno or E_NO_CONNECTION, last_error
> + return nil, err
> end
> - request.client = fiber_self()
> - local id = request.id
> - local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
> - repeat
> - local timeout = max(0, deadline - fiber_clock())
> - if not state_cond:wait(timeout) then
> - requests[id] = nil
> - return E_TIMEOUT, 'Timeout exceeded'
> - end
> - until requests[id] == nil -- i.e. completed (beware spurious wakeups)
> - return request.errno, request.response
> + return request:wait_result(timeout)
> end
>
> local function dispatch_response_iproto(hdr, body_rpos, body_end)
> @@ -988,9 +980,9 @@ function remote_methods:_request(method, opts, ...)
> transport.wait_state('active', timeout)
> timeout = deadline and max(0, deadline - fiber_clock())
> end
> - local err, res = transport.perform_request(timeout, buffer, method, ...)
> + local res, err = transport.perform_request(timeout, buffer, method, ...)
> if err then
> - box.error({code = err, reason = res})
> + box.error.raise(err)
> end
> -- Try to wait until a schema is reloaded if needed.
> -- Regardless of reloading result, the main response is
> @@ -1182,13 +1174,13 @@ function console_methods:eval(line, timeout)
> end
> if self.protocol == 'Binary' then
> local loader = 'return require("console").eval(...)'
> - err, res = pr(timeout, nil, 'eval', loader, {line})
> + res, err = pr(timeout, nil, 'eval', loader, {line})
> else
> assert(self.protocol == 'Lua console')
> - err, res = pr(timeout, nil, 'inject', line..'$EOF$\n')
> + res, err = pr(timeout, nil, 'inject', line..'$EOF$\n')
> end
> if err then
> - box.error({code = err, reason = res})
> + box.error.raise(err)
> end
> return res[1] or res
> end
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-23 16:42 ` Vladimir Davydov
@ 2018-04-23 18:59 ` Vladislav Shpilevoy
2018-04-24 13:16 ` Vladimir Davydov
0 siblings, 1 reply; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 18:59 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
Hello. Thanks for review! See the diff at the end of letter.
>> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
>> index 4ed2b375d..3868cdf1c 100644
>> --- a/src/box/lua/net_box.lua
>> +++ b/src/box/lua/net_box.lua
>> @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
>>
>> -- utility tables
>> local is_final_state = {closed = 1, error = 1}
>> -local method_codec = {
>> +
>> +local function decode_nil(...) end
>
>> +local function decode_nothing(...) return ... end
>
> decode_nop, may be?
Done.
>
>> +local function decode_one_tuple(response)
>> + if response[1] then
>> + return box.tuple.new(response[1])
>> + end
>> +end
>> +local function decode_single_tuple(response)
>> + if response[2] then
>> + return nil, box.error.MORE_THAN_ONE_TUPLE
>> + end
>> + if response[1] then
>> + return box.tuple.new(response[1])
>> + end
>> +end
>
> Do we really need this MORE_THAN_ONE_TUPLE error? Can't we use
> decode_one_tuple in 'get' as well? I ask, because having two methods
> called decode_one_tuple and decode_single_tuple is confusing IMO -
> it's unclear what's the difference between them.
I agree with you, it looks ugly, but IProto has no 'get' method, so
we must emulate it via select(limit = 2) + check that a tuple is
only one. I renamed decode_single to decode_unique. Maybe it looks
not so bad.
>
>> +local function decode_select(response)
>> + setmetatable(response, sequence_mt)
>> + for i, v in pairs(response) do
>> + response[i] = box.tuple.new(v)
>> + end
>> + return response
>> +end
>> +local function decode_count(response)
>> + return response[1]
>> +end
>> +
>> +local method_encoder = {
>> ping = internal.encode_ping,
>> call_16 = internal.encode_call_16,
>> call_17 = internal.encode_call,
>> @@ -61,6 +88,10 @@ local method_codec = {
>> update = internal.encode_update,
>> upsert = internal.encode_upsert,
>> select = internal.encode_select,
>> + get = internal.encode_select,
>> + min = internal.encode_select,
>> + max = internal.encode_select,
>> + count = internal.encode_call,
>> -- inject raw data into connection, used by console and tests
>> inject = function(buf, id, schema_version, bytes)
>> local ptr = buf:reserve(#bytes)
>> @@ -69,6 +100,24 @@ local method_codec = {
>> end
>> }
>>
>> +local method_decoder = {
>> + ping = decode_nil,
>> + call_16 = decode_select,
>> + call_17 = decode_nothing,
>> + eval = decode_nothing,
>> + insert = decode_one_tuple,
>> + replace = decode_one_tuple,
>> + delete = decode_one_tuple,
>> + update = decode_one_tuple,
>> + upsert = decode_nil,
>> + select = decode_select,
>> + get = decode_single_tuple,
>> + min = decode_single_tuple,
>> + max = decode_single_tuple,
>> + count = decode_count,
>> + inject = decode_nothing,
>> +}
>> +
>> local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
>>
>> --
>
>> @@ -1144,9 +1175,8 @@ index_metatable = function(remote)
>> if opts and opts.buffer then
>> error("index:min() doesn't support `buffer` argument")
>> end
>> - local res = remote:_request('select', opts, self.space.id, self.id,
>> - box.index.GE, 0, 1, key)
>> - return one_tuple(res)
>> + return remote:_request('get', opts, self.space.id, self.id,
>
> Should be 'min'?
Yes, did not notice that. Fixed on the branch.
>
>> + box.index.GE, 0, 1, key)
>> end
>>
>> function methods:max(key, opts)
>> @@ -1154,9 +1184,8 @@ index_metatable = function(remote)
>> if opts and opts.buffer then
>> error("index:max() doesn't support `buffer` argument")
>> end
>> - local res = remote:_request('select', opts, self.space.id, self.id,
>> - box.index.LE, 0, 1, key)
>> - return one_tuple(res)
>> + return remote:_request('get', opts, self.space.id, self.id,
>
> Should be 'max'?
Same, you are right. Fixed.
See the diff:
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3868cdf1c..53f301356 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -52,13 +52,13 @@ local E_PROC_LUA = box.error.PROC_LUA
local is_final_state = {closed = 1, error = 1}
local function decode_nil(...) end
-local function decode_nothing(...) return ... end
+local function decode_nop(...) return ... end
local function decode_one_tuple(response)
if response[1] then
return box.tuple.new(response[1])
end
end
-local function decode_single_tuple(response)
+local function decode_unique_tuple(response)
if response[2] then
return nil, box.error.MORE_THAN_ONE_TUPLE
end
@@ -103,19 +103,19 @@ local method_encoder = {
local method_decoder = {
ping = decode_nil,
call_16 = decode_select,
- call_17 = decode_nothing,
- eval = decode_nothing,
+ call_17 = decode_nop,
+ eval = decode_nop,
insert = decode_one_tuple,
replace = decode_one_tuple,
delete = decode_one_tuple,
update = decode_one_tuple,
upsert = decode_nil,
select = decode_select,
- get = decode_single_tuple,
- min = decode_single_tuple,
- max = decode_single_tuple,
+ get = decode_unique_tuple,
+ min = decode_unique_tuple,
+ max = decode_unique_tuple,
count = decode_count,
- inject = decode_nothing,
+ inject = decode_nop,
}
local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
@@ -1175,7 +1175,7 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:min() doesn't support `buffer` argument")
end
- return remote:_request('get', opts, self.space.id, self.id,
+ return remote:_request('min', opts, self.space.id, self.id,
box.index.GE, 0, 1, key)
end
@@ -1184,7 +1184,7 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:max() doesn't support `buffer` argument")
end
- return remote:_request('get', opts, self.space.id, self.id,
+ return remote:_request('max', opts, self.space.id, self.id,
box.index.LE, 0, 1, key)
end
^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-23 12:31 ` [tarantool-patches] " Alexander Turenko
@ 2018-04-23 18:59 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 18:59 UTC (permalink / raw)
To: tarantool-patches, Alexander Turenko
Hello. Thanks for review!
On 23/04/2018 15:31, Alexander Turenko wrote:
> Hi Vlad!
>
> Minor wording comments are below (Vlad asks to track it here).
>
> WBR, Alexander Turenko.
>
> On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
>> Now any netbox call blocks a caller-fiber until a result is read
>> from a socket, or time is out. To use it asynchronously it is
>> necessary to create a fiber per request. Sometimes it is
>> unwanted - for example if RPS is very high (for example, about
>> 100k), and latency is about 1 second. Or when it is neccessary
>> to send multiple requests in paralles and then collect responses
>> (map-reduce).
>
> Paralles -> parallel.
>
>> + function request_index:wait_result(timeout)
>> ...
>> + if not self:is_ready() then
>> + -- When a response is ready before timeout, the
>> + -- waiting client is waked up spuriously.
>
> Do you mean prematurely?
Fixed on branch.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 85f31b8e8..e3680006e 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -286,7 +286,7 @@ local function create_transport(host, port, user, password, callback,
end
if not self:is_ready() then
-- When a response is ready before timeout, the
- -- waiting client is waked up spuriously.
+ -- waiting client is waked up prematurely.
local old_client = self.client
self.client = fiber.self()
while timeout > 0 and not self:is_ready() do
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-23 16:44 ` Vladimir Davydov
@ 2018-04-23 18:59 ` Vladislav Shpilevoy
2018-04-24 13:05 ` Vladimir Davydov
0 siblings, 1 reply; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 18:59 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
Hello. Thanks for review!
On 23/04/2018 19:44, Vladimir Davydov wrote:
> On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
>> Now any netbox call blocks a caller-fiber until a result is read
>> from a socket, or time is out. To use it asynchronously it is
>> necessary to create a fiber per request. Sometimes it is
>> unwanted - for example if RPS is very high (for example, about
>> 100k), and latency is about 1 second. Or when it is neccessary
>> to send multiple requests in paralles and then collect responses
>> (map-reduce).
>>
>> The patch introduces a new option for all netbox requests:
>> is_async. With this option any called netbox method returns
>> immediately (but still yields for a moment) a 'future' object.
>>
>> By a future object a user can check if the request is finalized,
>> get a result or error, wait for a timeout, discard a response.
>>
>> Example of is_async usage:
>> future = conn:call(func, {params}, {..., is_async = true})
>> -- Do some work ...
>> if not future.is_ready() then
>> result, err = future:wait_result(timeout)
>> end
>> -- Or:
>> result, error = future:result()
>>
>> A future:result() and :wait_result() returns either an error or
>> a response in the same format, as the sync versions of the called
>> methods.
>>
>> Part of #3107
>> + -- It is possible that multiple fibers are waiting for
>> + -- a result. In such a case a first, who got it, must
>> + -- wakeup the previous waiting client. This one wakes
>> + -- up another. Another wakes up third one, etc.
>> + wakeup_client(old_client)
>
> This is rather difficult for understanding IMO. Can we use a fiber.cond
> instead?
Sure, we can. Done on the branch.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 1f4828a7e..9bbc047d5 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -233,12 +233,6 @@ local function create_transport(host, port, user, password, callback,
local send_buf = buffer.ibuf(buffer.READAHEAD)
local recv_buf = buffer.ibuf(buffer.READAHEAD)
- local function wakeup_client(client)
- if client and client:status() ~= 'dead' then
- client:wakeup()
- end
- end
-
--
-- Async request metamethods.
--
@@ -287,22 +281,14 @@ local function create_transport(host, port, user, password, callback,
if not self:is_ready() then
-- When a response is ready before timeout, the
-- waiting client is waked up prematurely.
- local old_client = self.client
- self.client = fiber.self()
while timeout > 0 and not self:is_ready() do
local ts = fiber.clock()
- state_cond:wait(timeout)
+ self.cond:wait(timeout)
timeout = timeout - (fiber.clock() - ts)
end
- self.client = old_client
if not self:is_ready() then
return nil, box.error.new(E_TIMEOUT)
end
- -- It is possible that multiple fibers are waiting for
- -- a result. In such a case a first, who got it, must
- -- wakeup the previous waiting client. This one wakes
- -- up another. Another wakes up third one, etc.
- wakeup_client(old_client)
end
return self:result()
end
@@ -333,6 +319,7 @@ local function create_transport(host, port, user, password, callback,
request.id = nil
request.errno = new_errno
request.response = new_error
+ request.cond:broadcast()
end
requests = {}
end
@@ -428,11 +415,15 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
+ -- Request has maximum 7 members:
+ -- method, schema_version, buffer, id, cond, errno,
+ -- response.
local request = setmetatable(table_new(0, 7), request_mt)
request.method = method
request.schema_version = schema_version
request.buffer = buffer
request.id = id
+ request.cond = fiber.cond()
requests[id] = request
return request
end
@@ -468,7 +459,7 @@ local function create_transport(host, port, user, password, callback,
assert(body_end == body_end_check, "invalid xrow length")
request.errno = band(status, IPROTO_ERRNO_MASK)
request.response = body[IPROTO_ERROR_KEY]
- wakeup_client(request.client)
+ request.cond:broadcast()
return
end
@@ -479,7 +470,7 @@ local function create_transport(host, port, user, password, callback,
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
request.response = tonumber(body_len)
- wakeup_client(request.client)
+ request.cond:broadcast()
return
end
@@ -490,7 +481,7 @@ local function create_transport(host, port, user, password, callback,
request.response, request.errno =
method_decoder[request.method](body[IPROTO_DATA_KEY])
end
- wakeup_client(request.client)
+ request.cond:broadcast()
end
local function new_request_id()
@@ -601,7 +592,7 @@ local function create_transport(host, port, user, password, callback,
request.id = nil
requests[rid] = nil
request.response = response
- wakeup_client(request.client)
+ request.cond:broadcast()
return console_sm(next_id(rid))
end
end
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 8/8] netbox: implement perform_request via async version
2018-04-23 16:47 ` Vladimir Davydov
@ 2018-04-23 19:00 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 19:00 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
Hello. Thanks for review!
On 23/04/2018 19:47, Vladimir Davydov wrote:
> On Mon, Apr 16, 2018 at 09:39:18PM +0300, Vladislav Shpilevoy wrote:
>> When async netbox was introduced, it is not needed to hold a
>> special sync implementation - it can be just async call + waiting
>> for a response.
>>
>> Closes #3107
>> ---
>> src/box/lua/net_box.lua | 26 +++++++++-----------------
>> 1 file changed, 9 insertions(+), 17 deletions(-)
>
> I'd squash this one with patch 6.
Done.
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
@ 2018-04-24 13:05 ` Vladimir Davydov
0 siblings, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-24 13:05 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 23, 2018 at 09:59:29PM +0300, Vladislav Shpilevoy wrote:
> Hello. Thanks for review!
>
> On 23/04/2018 19:44, Vladimir Davydov wrote:
> > On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
> > > Now any netbox call blocks a caller-fiber until a result is read
> > > from a socket, or time is out. To use it asynchronously it is
> > > necessary to create a fiber per request. Sometimes it is
> > > unwanted - for example if RPS is very high (for example, about
> > > 100k), and latency is about 1 second. Or when it is neccessary
> > > to send multiple requests in paralles and then collect responses
> > > (map-reduce).
> > >
> > > The patch introduces a new option for all netbox requests:
> > > is_async. With this option any called netbox method returns
> > > immediately (but still yields for a moment) a 'future' object.
> > >
> > > By a future object a user can check if the request is finalized,
> > > get a result or error, wait for a timeout, discard a response.
> > >
> > > Example of is_async usage:
> > > future = conn:call(func, {params}, {..., is_async = true})
> > > -- Do some work ...
> > > if not future.is_ready() then
> > > result, err = future:wait_result(timeout)
> > > end
> > > -- Or:
> > > result, error = future:result()
> > >
> > > A future:result() and :wait_result() returns either an error or
> > > a response in the same format, as the sync versions of the called
> > > methods.
> > >
> > > Part of #3107
> > > + -- It is possible that multiple fibers are waiting for
> > > + -- a result. In such a case a first, who got it, must
> > > + -- wakeup the previous waiting client. This one wakes
> > > + -- up another. Another wakes up third one, etc.
> > > + wakeup_client(old_client)
> >
> > This is rather difficult for understanding IMO. Can we use a fiber.cond
> > instead?
>
> Sure, we can. Done on the branch.
Thanks. The patch looks good to me now.
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
@ 2018-04-24 13:16 ` Vladimir Davydov
0 siblings, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-24 13:16 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
Ack
On Mon, Apr 23, 2018 at 09:59:13PM +0300, Vladislav Shpilevoy wrote:
> Hello. Thanks for review! See the diff at the end of letter.
> > > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> > > index 4ed2b375d..3868cdf1c 100644
> > > --- a/src/box/lua/net_box.lua
> > > +++ b/src/box/lua/net_box.lua
> > > @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
> > > -- utility tables
> > > local is_final_state = {closed = 1, error = 1}
> > > -local method_codec = {
> > > +
> > > +local function decode_nil(...) end
> >
> > > +local function decode_nothing(...) return ... end
> >
> > decode_nop, may be?
>
> Done.
>
> >
> > > +local function decode_one_tuple(response)
> > > + if response[1] then
> > > + return box.tuple.new(response[1])
> > > + end
> > > +end
> > > +local function decode_single_tuple(response)
> > > + if response[2] then
> > > + return nil, box.error.MORE_THAN_ONE_TUPLE
> > > + end
> > > + if response[1] then
> > > + return box.tuple.new(response[1])
> > > + end
> > > +end
> >
> > Do we really need this MORE_THAN_ONE_TUPLE error? Can't we use
> > decode_one_tuple in 'get' as well? I ask, because having two methods
> > called decode_one_tuple and decode_single_tuple is confusing IMO -
> > it's unclear what's the difference between them.
>
> I agree with you, it looks ugly, but IProto has no 'get' method, so
> we must emulate it via select(limit = 2) + check that a tuple is
> only one. I renamed decode_single to decode_unique. Maybe it looks
> not so bad.
>
> >
> > > +local function decode_select(response)
> > > + setmetatable(response, sequence_mt)
> > > + for i, v in pairs(response) do
> > > + response[i] = box.tuple.new(v)
> > > + end
> > > + return response
> > > +end
> > > +local function decode_count(response)
> > > + return response[1]
> > > +end
> > > +
> > > +local method_encoder = {
> > > ping = internal.encode_ping,
> > > call_16 = internal.encode_call_16,
> > > call_17 = internal.encode_call,
> > > @@ -61,6 +88,10 @@ local method_codec = {
> > > update = internal.encode_update,
> > > upsert = internal.encode_upsert,
> > > select = internal.encode_select,
> > > + get = internal.encode_select,
> > > + min = internal.encode_select,
> > > + max = internal.encode_select,
> > > + count = internal.encode_call,
> > > -- inject raw data into connection, used by console and tests
> > > inject = function(buf, id, schema_version, bytes)
> > > local ptr = buf:reserve(#bytes)
> > > @@ -69,6 +100,24 @@ local method_codec = {
> > > end
> > > }
> > > +local method_decoder = {
> > > + ping = decode_nil,
> > > + call_16 = decode_select,
> > > + call_17 = decode_nothing,
> > > + eval = decode_nothing,
> > > + insert = decode_one_tuple,
> > > + replace = decode_one_tuple,
> > > + delete = decode_one_tuple,
> > > + update = decode_one_tuple,
> > > + upsert = decode_nil,
> > > + select = decode_select,
> > > + get = decode_single_tuple,
> > > + min = decode_single_tuple,
> > > + max = decode_single_tuple,
> > > + count = decode_count,
> > > + inject = decode_nothing,
> > > +}
> > > +
> > > local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
> > > --
> >
> > > @@ -1144,9 +1175,8 @@ index_metatable = function(remote)
> > > if opts and opts.buffer then
> > > error("index:min() doesn't support `buffer` argument")
> > > end
> > > - local res = remote:_request('select', opts, self.space.id, self.id,
> > > - box.index.GE, 0, 1, key)
> > > - return one_tuple(res)
> > > + return remote:_request('get', opts, self.space.id, self.id,
> >
> > Should be 'min'?
>
> Yes, did not notice that. Fixed on the branch.
>
> >
> > > + box.index.GE, 0, 1, key)
> > > end
> > > function methods:max(key, opts)
> > > @@ -1154,9 +1184,8 @@ index_metatable = function(remote)
> > > if opts and opts.buffer then
> > > error("index:max() doesn't support `buffer` argument")
> > > end
> > > - local res = remote:_request('select', opts, self.space.id, self.id,
> > > - box.index.LE, 0, 1, key)
> > > - return one_tuple(res)
> > > + return remote:_request('get', opts, self.space.id, self.id,
> >
> > Should be 'max'?
>
> Same, you are right. Fixed.
>
> See the diff:
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 3868cdf1c..53f301356 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -52,13 +52,13 @@ local E_PROC_LUA = box.error.PROC_LUA
> local is_final_state = {closed = 1, error = 1}
> local function decode_nil(...) end
> -local function decode_nothing(...) return ... end
> +local function decode_nop(...) return ... end
> local function decode_one_tuple(response)
> if response[1] then
> return box.tuple.new(response[1])
> end
> end
> -local function decode_single_tuple(response)
> +local function decode_unique_tuple(response)
> if response[2] then
> return nil, box.error.MORE_THAN_ONE_TUPLE
> end
> @@ -103,19 +103,19 @@ local method_encoder = {
> local method_decoder = {
> ping = decode_nil,
> call_16 = decode_select,
> - call_17 = decode_nothing,
> - eval = decode_nothing,
> + call_17 = decode_nop,
> + eval = decode_nop,
> insert = decode_one_tuple,
> replace = decode_one_tuple,
> delete = decode_one_tuple,
> update = decode_one_tuple,
> upsert = decode_nil,
> select = decode_select,
> - get = decode_single_tuple,
> - min = decode_single_tuple,
> - max = decode_single_tuple,
> + get = decode_unique_tuple,
> + min = decode_unique_tuple,
> + max = decode_unique_tuple,
> count = decode_count,
> - inject = decode_nothing,
> + inject = decode_nop,
> }
> local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
> @@ -1175,7 +1175,7 @@ index_metatable = function(remote)
> if opts and opts.buffer then
> error("index:min() doesn't support `buffer` argument")
> end
> - return remote:_request('get', opts, self.space.id, self.id,
> + return remote:_request('min', opts, self.space.id, self.id,
> box.index.GE, 0, 1, key)
> end
> @@ -1184,7 +1184,7 @@ index_metatable = function(remote)
> if opts and opts.buffer then
> error("index:max() doesn't support `buffer` argument")
> end
> - return remote:_request('get', opts, self.space.id, self.id,
> + return remote:_request('max', opts, self.space.id, self.id,
> box.index.LE, 0, 1, key)
> end
>
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 1/8] lua: fix box.error.raise
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
2018-04-23 16:19 ` Vladimir Davydov
@ 2018-05-08 15:36 ` Konstantin Osipov
2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy
1 sibling, 1 reply; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:36 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> It did not work because raise is implemented as __index metatable
> member, and error() is __call metatable member. The second one
> takes additional implicit argument - self.
box.error.raise() is not documented.
And since it doesn't work, we can be sure it's not used.
Please remove it instead.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 2/8] lua: allow to create and error object with no throw
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
@ 2018-05-08 15:37 ` Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:37 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> Closes #3031
> ---
> src/box/lua/error.cc | 15 +++++++++++++++
> test/box/misc.result | 25 +++++++++++++++++++++++++
> test/box/misc.test.lua | 10 ++++++++++
> 3 files changed, 50 insertions(+)
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 3/8] console: fix a bug in interactive readline usage
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
@ 2018-05-08 15:37 ` Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:37 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> Spurious wakeups are possible in console, that makes readline
> think that there are some data on stdin. Waked up readline
> returns garbage instead of string, that crashes a server on
> assertion in Lua.
>
> Closes #3343
This should not have been on this patch set, I have pushed it
already into 1.9, please rebase to the latest trunk.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
2018-04-23 16:42 ` Vladimir Davydov
@ 2018-05-08 15:49 ` Konstantin Osipov
2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy
1 sibling, 1 reply; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:49 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> Netbox has a table 'method_codec' that is used to encode a
> request by a method name. But a response is decoded out of codec.
> It leads to
> 1) decoding into Lua tables before decoding into tuples where
> needed - it is double decoding and produces a lot of garbage;
> 2) each method contains hacks like one_tuple(), or single tuple
> check.
>
> These things can not be fixed with no real codec instead of
> encoder only.
>
> Also global table with decoders is needed for #3107, where
> a request could be sent async with no fiber blocking. An async
> response when received already does not have a call context - it
> has only method name.
>
> Needed for #3107
> ---
> src/box/lua/net_box.lua | 116 +++++++++++++++++++++++++++++-------------------
> test/box/net.box.result | 14 ++++++
> test/box/sql.result | 2 +
> 3 files changed, 87 insertions(+), 45 deletions(-)
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 4ed2b375d..3868cdf1c 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
>
> -- utility tables
> local is_final_state = {closed = 1, error = 1}
> -local method_codec = {
> +
> +local function decode_nil(...) end
> +local function decode_nothing(...) return ... end
decode_nothing -> decode_nop
> +local function decode_one_tuple(response)
> + if response[1] then
> + return box.tuple.new(response[1])
> + end
> +end
decode_one_tuple -> decode_tuple
Why do you need the if () guard?
> +local function decode_single_tuple(response)
> + if response[2] then
> + return nil, box.error.MORE_THAN_ONE_TUPLE
> + end
> + if response[1] then
> + return box.tuple.new(response[1])
> + end
> +end
decode_single_tuple -> decode_get()
> +local function decode_select(response)
> + setmetatable(response, sequence_mt)
> + for i, v in pairs(response) do
> + response[i] = box.tuple.new(v)
> + end
> + return response
> +end
Why are you using pairs() rather than ipairs()?
How does this avoid double decode as you claim in changeset
comment?
> +local function decode_count(response)
> + return response[1]
> +end
> +
> @@ -336,7 +385,10 @@ local function create_transport(host, port, user, password, callback,
> -- Decode xrow.body[DATA] to Lua objects
> body, body_end_check = decode(body_rpos)
> assert(body_end == body_end_check, "invalid xrow length")
> - return res -- the length of xrow.body
> - elseif not err then
> - setmetatable(res, sequence_mt)
> - local postproc = method ~= 'eval' and method ~= 'call_17'
> - if postproc then
> - local tnew = box.tuple.new
You removed this local variable from the decoder - it was here
for a reason. Please put it back in your implementation of decoder.
> diff --git a/test/box/net.box.result b/test/box/net.box.result
> index cf7b27f0b..6a3713fc0 100644
> --- a/test/box/net.box.result
> +++ b/test/box/net.box.result
> @@ -416,9 +416,11 @@ cn.space.net_box_test_space:select({234}, { iterator = 'LT' })
> ...
> cn.space.net_box_test_space:update({1}, { { '+', 2, 2 } })
> ---
> +- null
net.box calling convention should be the same as local box, if a
tuple is not found, it should return nothing, not null.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 5/8] test: fix unstable test
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
2018-04-22 5:32 ` [tarantool-patches] " Kirill Yukhin
@ 2018-05-08 15:50 ` Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:50 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> ---
> test/box/errinj.result | 3 +++
> test/box/errinj.test.lua | 1 +
> 2 files changed, 4 insertions(+)
This has been pushed already, should not have been in this patch
set in the first place.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 7/8] netbox: remove schema_version from requests
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
@ 2018-05-08 16:06 ` Konstantin Osipov
2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 16:06 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> Schema_version was used in netbox to update local box-like
> schema. The box-like schema makes able to access spaces and
> indexes via connection object.
>
> It was updated each time, when a response from a server is
> received with a schema version non-equal to the local value.
>
> But there was no reason why a schema version is needed in a
> request. It leads to ER_WRONG_SCHEMA_VERSION error sometimes,
> but netbox on this error just resends the same request again. The
> same behaviour can be reached with just no sending any schema
> version to a server.
As far as I can see you removed all checks for schema_version and
never re-fetch the schema after a connection is established,
despite what you write in the comment.
Please explain how you ever check that schema has changed when
performing a request.
>
> Remove schema_version from request, and just track schema version
> changes in responses.
>
> Part of #3351
> Part of #3333
> Follow up #3107
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 1/8] lua: fix box.error.raise
2018-05-08 15:36 ` [tarantool-patches] " Konstantin Osipov
@ 2018-05-08 17:24 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 17:24 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
Removed on the branch.
On 08/05/2018 18:36, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
>> It did not work because raise is implemented as __index metatable
>> member, and error() is __call metatable member. The second one
>> takes additional implicit argument - self.
>
> box.error.raise() is not documented.
>
> And since it doesn't work, we can be sure it's not used.
>
> Please remove it instead.
>
>
^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 7/8] netbox: remove schema_version from requests
2018-05-08 16:06 ` [tarantool-patches] " Konstantin Osipov
@ 2018-05-08 17:24 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 17:24 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov
On 08/05/2018 19:06, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
>
>> Schema_version was used in netbox to update local box-like
>> schema. The box-like schema makes able to access spaces and
>> indexes via connection object.
>>
>> It was updated each time, when a response from a server is
>> received with a schema version non-equal to the local value.
>>
>> But there was no reason why a schema version is needed in a
>> request. It leads to ER_WRONG_SCHEMA_VERSION error sometimes,
>> but netbox on this error just resends the same request again. The
>> same behaviour can be reached with just no sending any schema
>> version to a server.
>
> As far as I can see you removed all checks for schema_version and
> never re-fetch the schema after a connection is established,
> despite what you write in the comment.
>
> Please explain how you ever check that schema has changed when
> performing a request.
As I already explained you in Telegram, this check, even before
my commit, was done inside netbox state machine.
Look at the iproto_sm - this function on each response decodes the former
and checks schema version. You may think it is called only once from
iproto_schema_sm, that is called from iproto_auth_sm, but it is wrong.
Iproto_sm is just one state of the state machine. The machine can returns
into it again and again like this:
protocol_sm -> iproto_auth_sm -> iproto_schema_sm -> iproto_sm
-> iproto_sm ->
-> iproto_sm ->
...
-> iproto_sm ->
-> iproto_schema_sm ->
-> iproto_sm ->
...
These steps are executed inside the worker fiber.
^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-05-08 15:49 ` [tarantool-patches] " Konstantin Osipov
@ 2018-05-08 17:24 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 17:24 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov
Hello. Thanks for review.
On 08/05/2018 18:49, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
>> Netbox has a table 'method_codec' that is used to encode a
>> request by a method name. But a response is decoded out of codec.
>> It leads to
>> 1) decoding into Lua tables before decoding into tuples where
>> needed - it is double decoding and produces a lot of garbage;
>> 2) each method contains hacks like one_tuple(), or single tuple
>> check.
>>
>> These things can not be fixed with no real codec instead of
>> encoder only.
>>
>> Also global table with decoders is needed for #3107, where
>> a request could be sent async with no fiber blocking. An async
>> response when received already does not have a call context - it
>> has only method name.
>>
>> Needed for #3107
>> ---
>> src/box/lua/net_box.lua | 116 +++++++++++++++++++++++++++++-------------------
>> test/box/net.box.result | 14 ++++++
>> test/box/sql.result | 2 +
>> 3 files changed, 87 insertions(+), 45 deletions(-)
>>
>> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
>> index 4ed2b375d..3868cdf1c 100644
>> --- a/src/box/lua/net_box.lua
>> +++ b/src/box/lua/net_box.lua
>> @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
>>
>> -- utility tables
>> local is_final_state = {closed = 1, error = 1}
>> -local method_codec = {
>> +
>> +local function decode_nil(...) end
>> +local function decode_nothing(...) return ... end
>
> decode_nothing -> decode_nop
You reviewed the old patch. Is is fixed already on Vova request.
>
>> +local function decode_one_tuple(response)
>> + if response[1] then
>> + return box.tuple.new(response[1])
>> + end
>> +end
>
> decode_one_tuple -> decode_tuple
Ok.
@@ -53,7 +53,7 @@ local is_final_state = {closed = 1, error = 1}
local function decode_nil(...) end
local function decode_nop(...) return ... end
-local function decode_one_tuple(response)
+local function decode_tuple(response)
if response[1] then
return box.tuple.new(response[1])
end
@@ -105,10 +105,10 @@ local method_decoder = {
call_16 = decode_select,
call_17 = decode_nop,
eval = decode_nop,
- insert = decode_one_tuple,
- replace = decode_one_tuple,
- delete = decode_one_tuple,
- update = decode_one_tuple,
+ insert = decode_tuple,
+ replace = decode_tuple,
+ delete = decode_tuple,
+ update = decode_tuple,
upsert = decode_nil,
select = decode_select,
get = decode_unique_tuple,
>
> Why do you need the if () guard?
Body can be empty - in such a case there is no sense to allocate
cdata.
>
>> +local function decode_single_tuple(response)
>> + if response[2] then
>> + return nil, box.error.MORE_THAN_ONE_TUPLE
>> + end
>> + if response[1] then
>> + return box.tuple.new(response[1])
>> + end
>> +end
>
> decode_single_tuple -> decode_get()
Ok.
@@ -58,7 +58,7 @@ local function decode_tuple(response)
return box.tuple.new(response[1])
end
end
-local function decode_unique_tuple(response)
+local function decode_get(response)
if response[2] then
return nil, box.error.MORE_THAN_ONE_TUPLE
end
@@ -111,9 +111,9 @@ local method_decoder = {
update = decode_tuple,
upsert = decode_nil,
select = decode_select,
- get = decode_unique_tuple,
- min = decode_unique_tuple,
- max = decode_unique_tuple,
+ get = decode_get,
+ min = decode_get,
+ max = decode_get,
count = decode_count,
inject = decode_nop,
>
>> +local function decode_select(response)
>> + setmetatable(response, sequence_mt)
>> + for i, v in pairs(response) do
>> + response[i] = box.tuple.new(v)
>> + end
>> + return response
>> +end
>
> Why are you using pairs() rather than ipairs()?
Pairs is faster.
https://stackoverflow.com/questions/8955085/should-i-use-ipairs-or-a-for-loop
>
> How does this avoid double decode as you claim in changeset
> comment?
In no way. I did not said, that I fix it here. I just note, that it is a
problem, solution of which requires separate decoders.
>
>> +local function decode_count(response)
>> + return response[1]
>> +end
>> +
>> @@ -336,7 +385,10 @@ local function create_transport(host, port, user, password, callback,
>> -- Decode xrow.body[DATA] to Lua objects
>> body, body_end_check = decode(body_rpos)
>> assert(body_end == body_end_check, "invalid xrow length")
>> - return res -- the length of xrow.body
>> - elseif not err then
>> - setmetatable(res, sequence_mt)
>> - local postproc = method ~= 'eval' and method ~= 'call_17'
>> - if postproc then
>> - local tnew = box.tuple.new
>
> You removed this local variable from the decoder - it was here
> for a reason. Please put it back in your implementation of decoder.
Ok.
@@ -68,8 +68,9 @@ local function decode_unique_tuple(response)
end
local function decode_select(response)
setmetatable(response, sequence_mt)
+ local tnew box.tuple.new
for i, v in pairs(response) do
- response[i] = box.tuple.new(v)
+ response[i] = tnew(v)
end
return response
end
>
>> diff --git a/test/box/net.box.result b/test/box/net.box.result
>> index cf7b27f0b..6a3713fc0 100644
>> --- a/test/box/net.box.result
>> +++ b/test/box/net.box.result
>> @@ -416,9 +416,11 @@ cn.space.net_box_test_space:select({234}, { iterator = 'LT' })
>> ...
>> cn.space.net_box_test_space:update({1}, { { '+', 2, 2 } })
>> ---
>> +- null
>
> net.box calling convention should be the same as local box, if a
> tuple is not found, it should return nothing, not null.
>
Ok.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 0e2485f66..f6eef56fb 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -68,7 +68,7 @@ local function decode_get(response)
end
local function decode_select(response)
setmetatable(response, sequence_mt)
- local tnew box.tuple.new
+ local tnew = box.tuple.new
for i, v in pairs(response) do
response[i] = tnew(v)
end
@@ -1099,6 +1099,12 @@ function console_methods:eval(line, timeout)
return res[1] or res
end
+local function nothing_or_data(value)
+ if value ~= nil then
+ return value
+ end
+end
+
space_metatable = function(remote)
local methods = {}
@@ -1129,7 +1135,8 @@ space_metatable = function(remote)
function methods:upsert(key, oplist, opts)
check_space_arg(self, 'upsert')
- return remote:_request('upsert', opts, self.id, key, oplist)
+ return nothing_or_data(remote:_request('upsert', opts, self.id, key,
+ oplist))
end
function methods:get(key, opts)
@@ -1167,8 +1174,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:get() doesn't support `buffer` argument")
end
- return remote:_request('get', opts, self.space.id, self.id,
- box.index.EQ, 0, 2, key)
+ return nothing_or_data(remote:_request('get', opts, self.space.id,
+ self.id, box.index.EQ, 0, 2, key))
end
function methods:min(key, opts)
@@ -1176,8 +1183,9 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:min() doesn't support `buffer` argument")
end
- return remote:_request('min', opts, self.space.id, self.id,
- box.index.GE, 0, 1, key)
+ return nothing_or_data(remote:_request('min', opts, self.space.id,
+ self.id, box.index.GE, 0, 1,
+ key))
end
function methods:max(key, opts)
@@ -1185,8 +1193,9 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:max() doesn't support `buffer` argument")
end
- return remote:_request('max', opts, self.space.id, self.id,
- box.index.LE, 0, 1, key)
+ return nothing_or_data(remote:_request('max', opts, self.space.id,
+ self.id, box.index.LE, 0, 1,
+ key))
end
function methods:count(key, opts)
@@ -1201,13 +1210,14 @@ index_metatable = function(remote)
function methods:delete(key, opts)
check_index_arg(self, 'delete')
- return remote:_request('delete', opts, self.space.id, self.id, key)
+ return nothing_or_data(remote:_request('delete', opts, self.space.id,
+ self.id, key))
end
function methods:update(key, oplist, opts)
check_index_arg(self, 'update')
- return remote:_request('update', opts, self.space.id, self.id, key,
- oplist)
+ return nothing_or_data(remote:_request('update', opts, self.space.id,
+ self.id, key, oplist))
end
return { __index = methods, __metatable = false }
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 6a3713fc0..4fbc70ec6 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -416,11 +416,9 @@ cn.space.net_box_test_space:select({234}, { iterator = 'LT' })
...
cn.space.net_box_test_space:update({1}, { { '+', 2, 2 } })
---
-- null
...
cn.space.net_box_test_space:delete{1}
---
-- null
...
cn.space.net_box_test_space:delete{2}
---
@@ -428,7 +426,6 @@ cn.space.net_box_test_space:delete{2}
...
cn.space.net_box_test_space:delete{2}
---
-- null
...
-- test one-based indexing in splice operation (see update.test.lua)
cn.space.net_box_test_space:replace({10, 'abcde'})
@@ -757,15 +754,12 @@ remote_space:upsert({3}, {}, { timeout = 1e-9 })
...
remote_space:upsert({4}, {})
---
-- null
...
remote_space:upsert({5}, {}, { timeout = 1.00 })
---
-- null
...
remote_space:upsert({3}, {})
---
-- null
...
remote_space:update({3}, {}, { timeout = 1e-9 })
---
@@ -987,15 +981,12 @@ _ = remote_pk:delete({5})
...
remote_space:get(0)
---
-- null
...
remote_space:get(1)
---
-- null
...
remote_space:get(2)
---
-- null
...
remote_space = nil
---
@@ -1327,7 +1318,6 @@ c.space.test:select{}
...
c.space.test:upsert({1, 2, 'nothing'}, {{'+', 2, 1}}) -- common update
---
-- null
...
c.space.test:select{}
---
@@ -1335,7 +1325,6 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'something'}, {{'+', 2, 1}}) -- insert
---
-- null
...
c.space.test:select{}
---
@@ -1344,7 +1333,6 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'nothing'}, {{'+', 3, 100500}}) -- wrong operation
---
-- null
...
c.space.test:select{}
---
diff --git a/test/box/sql.result b/test/box/sql.result
index 95f8da7dd..11a698850 100644
--- a/test/box/sql.result
+++ b/test/box/sql.result
@@ -105,7 +105,6 @@ space:select{1}
-- xxx: update comes through, returns 0 rows affected
space:update(1, {{'=', 2, 'I am a new tuple'}})
---
-- null
...
-- nothing is selected, since nothing was there
space:select{1}
@@ -209,7 +208,6 @@ space:delete(0)
...
space:delete(4294967295)
---
-- null
...
box.space.test:drop()
---
^ permalink raw reply [flat|nested] 32+ messages in thread
end of thread, other threads:[~2018-05-08 17:24 UTC | newest]
Thread overview: 32+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
2018-04-23 16:19 ` Vladimir Davydov
2018-05-08 15:36 ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
2018-04-23 16:42 ` Vladimir Davydov
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-24 13:16 ` Vladimir Davydov
2018-05-08 15:49 ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
2018-04-22 5:32 ` [tarantool-patches] " Kirill Yukhin
2018-05-08 15:50 ` Konstantin Osipov
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-23 12:31 ` [tarantool-patches] " Alexander Turenko
2018-04-23 18:59 ` Vladislav Shpilevoy
2018-04-23 16:44 ` Vladimir Davydov
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-24 13:05 ` Vladimir Davydov
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
2018-05-08 16:06 ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
2018-04-23 16:47 ` Vladimir Davydov
2018-04-23 19:00 ` [tarantool-patches] " Vladislav Shpilevoy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox