* [PATCH 1/8] lua: fix box.error.raise
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:19 ` Vladimir Davydov
2018-05-08 15:36 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
` (6 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
It did not work because raise is implemented as __index metatable
member, and error() is __call metatable member. The second one
takes additional implicit argument - self.
---
src/box/lua/error.cc | 78 +++++++++++++++++++++++++++++++++-----------------
test/box/misc.result | 39 +++++++++++++++++++++++++
test/box/misc.test.lua | 18 ++++++++++++
3 files changed, 108 insertions(+), 27 deletions(-)
diff --git a/src/box/lua/error.cc b/src/box/lua/error.cc
index 314907421..56cc2c563 100644
--- a/src/box/lua/error.cc
+++ b/src/box/lua/error.cc
@@ -42,25 +42,18 @@ extern "C" {
#include "lua/utils.h"
#include "box/error.h"
-static int
-luaT_error_raise(lua_State *L)
+static void
+luaT_error_create(lua_State *L, int top, int top_base)
{
uint32_t code = 0;
const char *reason = NULL;
const char *file = "";
unsigned line = 0;
lua_Debug info;
- /* lua_type(L, 1) == LUA_TTABLE - box.error table */
- int top = lua_gettop(L);
- if (top <= 1) {
- /* re-throw saved exceptions (if any) */
- if (box_error_last())
- luaT_error(L);
- return 0;
- } else if (top >= 2 && lua_type(L, 2) == LUA_TNUMBER) {
- code = lua_tonumber(L, 2);
+ if (top >= top_base && lua_type(L, top_base) == LUA_TNUMBER) {
+ code = lua_tonumber(L, top_base);
reason = tnt_errcode_desc(code);
- if (top > 2) {
+ if (top > top_base) {
/* Call string.format(reason, ...) to format message */
lua_getglobal(L, "string");
if (lua_isnil(L, -1))
@@ -69,24 +62,29 @@ luaT_error_raise(lua_State *L)
if (lua_isnil(L, -1))
goto raise;
lua_pushstring(L, reason);
- for (int i = 3; i <= top; i++)
+ for (int i = top_base + 1; i <= top; i++)
lua_pushvalue(L, i);
- lua_call(L, top - 1, 1);
+ lua_call(L, top - top_base + 1, 1);
reason = lua_tostring(L, -1);
} else if (strchr(reason, '%') != NULL) {
/* Missing arguments to format string */
luaL_error(L, "box.error(): bad arguments");
}
- } else if (top == 2 && lua_istable(L, 2)) {
- /* A special case that rethrows raw error (used by net.box) */
- lua_getfield(L, 2, "code");
- code = lua_tonumber(L, -1);
- lua_pop(L, 1);
- lua_getfield(L, 2, "reason");
- reason = lua_tostring(L, -1);
- if (reason == NULL)
- reason = "";
- lua_pop(L, 1);
+ } else if (top == top_base) {
+ if (lua_istable(L, top_base)) {
+ /* A special case that rethrows raw error (used by net.box) */
+ lua_getfield(L, top_base, "code");
+ code = lua_tonumber(L, -1);
+ lua_pop(L, 1);
+ lua_getfield(L, top_base, "reason");
+ reason = lua_tostring(L, -1);
+ if (reason == NULL)
+ reason = "";
+ lua_pop(L, 1);
+ } else if (luaL_iserror(L, top_base)) {
+ lua_error(L);
+ return;
+ }
} else {
luaL_error(L, "box.error(): bad arguments");
}
@@ -104,8 +102,34 @@ raise:
}
say_debug("box.error() at %s:%i", file, line);
box_error_set(file, line, code, "%s", reason);
- luaT_error(L);
- return 0;
+}
+
+static int
+luaT_error_call(lua_State *L)
+{
+ int top = lua_gettop(L);
+ if (top <= 1) {
+ /* Re-throw saved exceptions if any. */
+ if (box_error_last())
+ luaT_error(L);
+ return 0;
+ }
+ luaT_error_create(L, top, 2);
+ return luaT_error(L);
+}
+
+static int
+luaT_error_raise(lua_State *L)
+{
+ int top = lua_gettop(L);
+ if (top == 0) {
+ /* Re-throw saved exceptions if any. */
+ if (box_error_last())
+ luaT_error(L);
+ return 0;
+ }
+ luaT_error_create(L, top, 1);
+ return luaT_error(L);
}
static int
@@ -214,7 +238,7 @@ box_lua_error_init(struct lua_State *L) {
}
lua_newtable(L);
{
- lua_pushcfunction(L, luaT_error_raise);
+ lua_pushcfunction(L, luaT_error_call);
lua_setfield(L, -2, "__call");
lua_newtable(L);
diff --git a/test/box/misc.result b/test/box/misc.result
index 57717c4fe..2102e4a1c 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -176,6 +176,45 @@ box.error(box.error.UNSUPPORTED)
---
- error: 'box.error(): bad arguments'
...
+--
+-- box.error.raise not worked because it is __index method of
+-- box.error and box.error() is the __call method. The second
+-- one takes itself as the first argument.
+--
+box.error(box.error.CREATE_SPACE, "space", "error")
+---
+- error: 'Failed to create space ''space'': error'
+...
+box.error()
+---
+- error: 'Failed to create space ''space'': error'
+...
+box.error.raise()
+---
+- error: 'Failed to create space ''space'': error'
+...
+box.error.raise(box.error.CREATE_SPACE, "space", "error")
+---
+- error: 'Failed to create space ''space'': error'
+...
+box.error.raise(box.error.UNKNOWN)
+---
+- error: Unknown error
+...
+--
+-- Allow to rethrow error.
+--
+_, err = pcall(box.error, box.error.UNKNOWN)
+---
+...
+box.error(err)
+---
+- error: Unknown error
+...
+box.error.raise(err)
+---
+- error: Unknown error
+...
----------------
-- # box.stat
----------------
diff --git a/test/box/misc.test.lua b/test/box/misc.test.lua
index b7bf600c3..299dc830f 100644
--- a/test/box/misc.test.lua
+++ b/test/box/misc.test.lua
@@ -52,6 +52,24 @@ box.error(box.error.UNSUPPORTED, "x", "x%s")
box.error(box.error.UNSUPPORTED, "x")
box.error(box.error.UNSUPPORTED)
+--
+-- box.error.raise not worked because it is __index method of
+-- box.error and box.error() is the __call method. The second
+-- one takes itself as the first argument.
+--
+box.error(box.error.CREATE_SPACE, "space", "error")
+box.error()
+box.error.raise()
+box.error.raise(box.error.CREATE_SPACE, "space", "error")
+box.error.raise(box.error.UNKNOWN)
+
+--
+-- Allow to rethrow error.
+--
+_, err = pcall(box.error, box.error.UNKNOWN)
+box.error(err)
+box.error.raise(err)
+
----------------
-- # box.stat
----------------
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 1/8] lua: fix box.error.raise
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
@ 2018-04-23 16:19 ` Vladimir Davydov
2018-05-08 15:36 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:19 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:11PM +0300, Vladislav Shpilevoy wrote:
> It did not work because raise is implemented as __index metatable
> member, and error() is __call metatable member. The second one
> takes additional implicit argument - self.
> ---
> src/box/lua/error.cc | 78 +++++++++++++++++++++++++++++++++-----------------
> test/box/misc.result | 39 +++++++++++++++++++++++++
> test/box/misc.test.lua | 18 ++++++++++++
> 3 files changed, 108 insertions(+), 27 deletions(-)
Ack
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 1/8] lua: fix box.error.raise
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
2018-04-23 16:19 ` Vladimir Davydov
@ 2018-05-08 15:36 ` Konstantin Osipov
2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy
1 sibling, 1 reply; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:36 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> It did not work because raise is implemented as __index metatable
> member, and error() is __call metatable member. The second one
> takes additional implicit argument - self.
box.error.raise() is not documented.
And since it doesn't work, we can be sure it's not used.
Please remove it instead.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 1/8] lua: fix box.error.raise
2018-05-08 15:36 ` [tarantool-patches] " Konstantin Osipov
@ 2018-05-08 17:24 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 17:24 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
Removed on the branch.
On 08/05/2018 18:36, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
>> It did not work because raise is implemented as __index metatable
>> member, and error() is __call metatable member. The second one
>> takes additional implicit argument - self.
>
> box.error.raise() is not documented.
>
> And since it doesn't work, we can be sure it's not used.
>
> Please remove it instead.
>
>
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 2/8] lua: allow to create and error object with no throw
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
` (5 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
It is needed to return error via 'nil, error_object' notation,
and to store an error object to return it later.
Closes #3031
---
src/box/lua/error.cc | 15 +++++++++++++++
test/box/misc.result | 25 +++++++++++++++++++++++++
test/box/misc.test.lua | 10 ++++++++++
3 files changed, 50 insertions(+)
diff --git a/src/box/lua/error.cc b/src/box/lua/error.cc
index 56cc2c563..2a56f1a12 100644
--- a/src/box/lua/error.cc
+++ b/src/box/lua/error.cc
@@ -148,6 +148,17 @@ luaT_error_last(lua_State *L)
return 1;
}
+static int
+luaT_error_new(lua_State *L)
+{
+ int top = lua_gettop(L);
+ if (top == 0)
+ return luaL_error(L, "Usage: box.error.new(code, args)");
+ luaT_error_create(L, top, 1);
+ lua_settop(L, 0);
+ return luaT_error_last(L);
+}
+
static int
luaT_error_clear(lua_State *L)
{
@@ -254,6 +265,10 @@ box_lua_error_init(struct lua_State *L) {
lua_pushcfunction(L, luaT_error_raise);
lua_setfield(L, -2, "raise");
}
+ {
+ lua_pushcfunction(L, luaT_error_new);
+ lua_setfield(L, -2, "new");
+ }
lua_setfield(L, -2, "__index");
}
lua_setmetatable(L, -2);
diff --git a/test/box/misc.result b/test/box/misc.result
index 2102e4a1c..4b0f0e53d 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -215,6 +215,31 @@ box.error.raise(err)
---
- error: Unknown error
...
+--
+-- gh-3031: allow to create an error object with no throwing it.
+--
+e = box.error.new(box.error.UNKNOWN)
+---
+...
+e
+---
+- Unknown error
+...
+e = box.error.new(box.error.CREATE_SPACE, "space", "error")
+---
+...
+e
+---
+- 'Failed to create space ''space'': error'
+...
+box.error.new()
+---
+- error: 'Usage: box.error.new(code, args)'
+...
+box.error.raise()
+---
+- error: 'Failed to create space ''space'': error'
+...
----------------
-- # box.stat
----------------
diff --git a/test/box/misc.test.lua b/test/box/misc.test.lua
index 299dc830f..33900d24e 100644
--- a/test/box/misc.test.lua
+++ b/test/box/misc.test.lua
@@ -70,6 +70,16 @@ _, err = pcall(box.error, box.error.UNKNOWN)
box.error(err)
box.error.raise(err)
+--
+-- gh-3031: allow to create an error object with no throwing it.
+--
+e = box.error.new(box.error.UNKNOWN)
+e
+e = box.error.new(box.error.CREATE_SPACE, "space", "error")
+e
+box.error.new()
+box.error.raise()
+
----------------
-- # box.stat
----------------
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 2/8] lua: allow to create and error object with no throw
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
@ 2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:20 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:12PM +0300, Vladislav Shpilevoy wrote:
> It is needed to return error via 'nil, error_object' notation,
> and to store an error object to return it later.
>
> Closes #3031
> ---
> src/box/lua/error.cc | 15 +++++++++++++++
> test/box/misc.result | 25 +++++++++++++++++++++++++
> test/box/misc.test.lua | 10 ++++++++++
> 3 files changed, 50 insertions(+)
Ack
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 2/8] lua: allow to create and error object with no throw
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
@ 2018-05-08 15:37 ` Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:37 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> Closes #3031
> ---
> src/box/lua/error.cc | 15 +++++++++++++++
> test/box/misc.result | 25 +++++++++++++++++++++++++
> test/box/misc.test.lua | 10 ++++++++++
> 3 files changed, 50 insertions(+)
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 3/8] console: fix a bug in interactive readline usage
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
` (4 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Spurious wakeups are possible in console, that makes readline
think that there are some data on stdin. Waked up readline
returns garbage instead of string, that crashes a server on
assertion in Lua.
Closes #3343
---
src/box/lua/console.c | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index d27d7ecac..7a5ac5550 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -221,8 +221,9 @@ lbox_console_readline(struct lua_State *L)
*/
rl_callback_handler_install(prompt, console_push_line);
top = lua_gettop(L);
- while (top == lua_gettop(L) &&
- coio_wait(STDIN_FILENO, COIO_READ, TIMEOUT_INFINITY)) {
+ while (top == lua_gettop(L)) {
+ while (coio_wait(STDIN_FILENO, COIO_READ,
+ TIMEOUT_INFINITY) == 0);
rl_callback_read_char();
}
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 3/8] console: fix a bug in interactive readline usage
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
@ 2018-04-23 16:20 ` Vladimir Davydov
2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:20 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:13PM +0300, Vladislav Shpilevoy wrote:
> Spurious wakeups are possible in console, that makes readline
> think that there are some data on stdin. Waked up readline
> returns garbage instead of string, that crashes a server on
> assertion in Lua.
>
> Closes #3343
> ---
> src/box/lua/console.c | 5 +++--
> 1 file changed, 3 insertions(+), 2 deletions(-)
Ack
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 3/8] console: fix a bug in interactive readline usage
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
2018-04-23 16:20 ` Vladimir Davydov
@ 2018-05-08 15:37 ` Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:37 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> Spurious wakeups are possible in console, that makes readline
> think that there are some data on stdin. Waked up readline
> returns garbage instead of string, that crashes a server on
> assertion in Lua.
>
> Closes #3343
This should not have been on this patch set, I have pushed it
already into 1.9, please rebase to the latest trunk.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (2 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:42 ` Vladimir Davydov
2018-05-08 15:49 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
` (3 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Netbox has a table 'method_codec' that is used to encode a
request by a method name. But a response is decoded out of codec.
It leads to
1) decoding into Lua tables before decoding into tuples where
needed - it is double decoding and produces a lot of garbage;
2) each method contains hacks like one_tuple(), or single tuple
check.
These things can not be fixed with no real codec instead of
encoder only.
Also global table with decoders is needed for #3107, where
a request could be sent async with no fiber blocking. An async
response when received already does not have a call context - it
has only method name.
Needed for #3107
---
src/box/lua/net_box.lua | 116 +++++++++++++++++++++++++++++-------------------
test/box/net.box.result | 14 ++++++
test/box/sql.result | 2 +
3 files changed, 87 insertions(+), 45 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 4ed2b375d..3868cdf1c 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
-- utility tables
local is_final_state = {closed = 1, error = 1}
-local method_codec = {
+
+local function decode_nil(...) end
+local function decode_nothing(...) return ... end
+local function decode_one_tuple(response)
+ if response[1] then
+ return box.tuple.new(response[1])
+ end
+end
+local function decode_single_tuple(response)
+ if response[2] then
+ return nil, box.error.MORE_THAN_ONE_TUPLE
+ end
+ if response[1] then
+ return box.tuple.new(response[1])
+ end
+end
+local function decode_select(response)
+ setmetatable(response, sequence_mt)
+ for i, v in pairs(response) do
+ response[i] = box.tuple.new(v)
+ end
+ return response
+end
+local function decode_count(response)
+ return response[1]
+end
+
+local method_encoder = {
ping = internal.encode_ping,
call_16 = internal.encode_call_16,
call_17 = internal.encode_call,
@@ -61,6 +88,10 @@ local method_codec = {
update = internal.encode_update,
upsert = internal.encode_upsert,
select = internal.encode_select,
+ get = internal.encode_select,
+ min = internal.encode_select,
+ max = internal.encode_select,
+ count = internal.encode_call,
-- inject raw data into connection, used by console and tests
inject = function(buf, id, schema_version, bytes)
local ptr = buf:reserve(#bytes)
@@ -69,6 +100,24 @@ local method_codec = {
end
}
+local method_decoder = {
+ ping = decode_nil,
+ call_16 = decode_select,
+ call_17 = decode_nothing,
+ eval = decode_nothing,
+ insert = decode_one_tuple,
+ replace = decode_one_tuple,
+ delete = decode_one_tuple,
+ update = decode_one_tuple,
+ upsert = decode_nil,
+ select = decode_select,
+ get = decode_single_tuple,
+ min = decode_single_tuple,
+ max = decode_single_tuple,
+ count = decode_count,
+ inject = decode_nothing,
+}
+
local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
--
@@ -278,7 +327,7 @@ local function create_transport(host, port, user, password, callback,
worker_fiber:wakeup()
end
local id = next_request_id
- method_codec[method](send_buf, id, schema_version, ...)
+ method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
local request = table_new(0, 6) -- reserve space for 6 keys
request.client = fiber_self()
@@ -336,7 +385,10 @@ local function create_transport(host, port, user, password, callback,
-- Decode xrow.body[DATA] to Lua objects
body, body_end_check = decode(body_rpos)
assert(body_end == body_end_check, "invalid xrow length")
- request.response = body[IPROTO_DATA_KEY]
+ if body and body[IPROTO_DATA_KEY] then
+ request.response, request.errno =
+ method_decoder[request.method](body[IPROTO_DATA_KEY])
+ end
wakeup_client(request.client)
end
@@ -417,7 +469,7 @@ local function create_transport(host, port, user, password, callback,
log.warn("Netbox text protocol support is deprecated since 1.10, "..
"please use require('console').connect() instead")
local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
- method_codec.inject(send_buf, nil, nil, setup_delimiter)
+ method_encoder.inject(send_buf, nil, nil, setup_delimiter)
local err, response = send_and_recv_console()
if err then
return error_sm(err, response)
@@ -835,18 +887,8 @@ function remote_methods:_request(method, opts, ...)
end
err, res = perform_request(timeout, buffer, method,
self.schema_version, ...)
- if not err and buffer ~= nil then
- return res -- the length of xrow.body
- elseif not err then
- setmetatable(res, sequence_mt)
- local postproc = method ~= 'eval' and method ~= 'call_17'
- if postproc then
- local tnew = box.tuple.new
- for i, v in pairs(res) do
- res[i] = tnew(v)
- end
- end
- return res -- decoded xrow.body[DATA]
+ if not err then
+ return res
elseif err == E_WRONG_SCHEMA_VERSION then
err = nil
end
@@ -1056,25 +1098,17 @@ function console_methods:eval(line, timeout)
return res[1] or res
end
-local function one_tuple(tab)
- if type(tab) ~= 'table' then
- return tab
- elseif tab[1] ~= nil then
- return tab[1]
- end
-end
-
space_metatable = function(remote)
local methods = {}
function methods:insert(tuple, opts)
check_space_arg(self, 'insert')
- return one_tuple(remote:_request('insert', opts, self.id, tuple))
+ return remote:_request('insert', opts, self.id, tuple)
end
function methods:replace(tuple, opts)
check_space_arg(self, 'replace')
- return one_tuple(remote:_request('replace', opts, self.id, tuple))
+ return remote:_request('replace', opts, self.id, tuple)
end
function methods:select(key, opts)
@@ -1094,8 +1128,7 @@ space_metatable = function(remote)
function methods:upsert(key, oplist, opts)
check_space_arg(self, 'upsert')
- remote:_request('upsert', opts, self.id, key, oplist)
- return
+ return remote:_request('upsert', opts, self.id, key, oplist)
end
function methods:get(key, opts)
@@ -1133,10 +1166,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:get() doesn't support `buffer` argument")
end
- local res = remote:_request('select', opts, self.space.id, self.id,
- box.index.EQ, 0, 2, key)
- if res[2] ~= nil then box.error(box.error.MORE_THAN_ONE_TUPLE) end
- if res[1] ~= nil then return res[1] end
+ return remote:_request('get', opts, self.space.id, self.id,
+ box.index.EQ, 0, 2, key)
end
function methods:min(key, opts)
@@ -1144,9 +1175,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:min() doesn't support `buffer` argument")
end
- local res = remote:_request('select', opts, self.space.id, self.id,
- box.index.GE, 0, 1, key)
- return one_tuple(res)
+ return remote:_request('get', opts, self.space.id, self.id,
+ box.index.GE, 0, 1, key)
end
function methods:max(key, opts)
@@ -1154,9 +1184,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:max() doesn't support `buffer` argument")
end
- local res = remote:_request('select', opts, self.space.id, self.id,
- box.index.LE, 0, 1, key)
- return one_tuple(res)
+ return remote:_request('get', opts, self.space.id, self.id,
+ box.index.LE, 0, 1, key)
end
function methods:count(key, opts)
@@ -1166,21 +1195,18 @@ index_metatable = function(remote)
end
local code = string.format('box.space.%s.index.%s:count',
self.space.name, self.name)
- return remote:_request('call_16', opts, code, { key })[1][1]
+ return remote:_request('count', opts, code, { key })
end
function methods:delete(key, opts)
check_index_arg(self, 'delete')
- local res = remote:_request('delete', opts, self.space.id, self.id,
- key)
- return one_tuple(res)
+ return remote:_request('delete', opts, self.space.id, self.id, key)
end
function methods:update(key, oplist, opts)
check_index_arg(self, 'update')
- local res = remote:_request('update', opts, self.space.id, self.id,
- key, oplist)
- return one_tuple(res)
+ return remote:_request('update', opts, self.space.id, self.id, key,
+ oplist)
end
return { __index = methods, __metatable = false }
diff --git a/test/box/net.box.result b/test/box/net.box.result
index cf7b27f0b..6a3713fc0 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -416,9 +416,11 @@ cn.space.net_box_test_space:select({234}, { iterator = 'LT' })
...
cn.space.net_box_test_space:update({1}, { { '+', 2, 2 } })
---
+- null
...
cn.space.net_box_test_space:delete{1}
---
+- null
...
cn.space.net_box_test_space:delete{2}
---
@@ -426,6 +428,7 @@ cn.space.net_box_test_space:delete{2}
...
cn.space.net_box_test_space:delete{2}
---
+- null
...
-- test one-based indexing in splice operation (see update.test.lua)
cn.space.net_box_test_space:replace({10, 'abcde'})
@@ -754,12 +757,15 @@ remote_space:upsert({3}, {}, { timeout = 1e-9 })
...
remote_space:upsert({4}, {})
---
+- null
...
remote_space:upsert({5}, {}, { timeout = 1.00 })
---
+- null
...
remote_space:upsert({3}, {})
---
+- null
...
remote_space:update({3}, {}, { timeout = 1e-9 })
---
@@ -981,12 +987,15 @@ _ = remote_pk:delete({5})
...
remote_space:get(0)
---
+- null
...
remote_space:get(1)
---
+- null
...
remote_space:get(2)
---
+- null
...
remote_space = nil
---
@@ -1318,6 +1327,7 @@ c.space.test:select{}
...
c.space.test:upsert({1, 2, 'nothing'}, {{'+', 2, 1}}) -- common update
---
+- null
...
c.space.test:select{}
---
@@ -1325,6 +1335,7 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'something'}, {{'+', 2, 1}}) -- insert
---
+- null
...
c.space.test:select{}
---
@@ -1333,6 +1344,7 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'nothing'}, {{'+', 3, 100500}}) -- wrong operation
---
+- null
...
c.space.test:select{}
---
@@ -1481,6 +1493,7 @@ result
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
+- 7
...
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
---
@@ -1492,6 +1505,7 @@ result
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
+- 7
...
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
---
diff --git a/test/box/sql.result b/test/box/sql.result
index 11a698850..95f8da7dd 100644
--- a/test/box/sql.result
+++ b/test/box/sql.result
@@ -105,6 +105,7 @@ space:select{1}
-- xxx: update comes through, returns 0 rows affected
space:update(1, {{'=', 2, 'I am a new tuple'}})
---
+- null
...
-- nothing is selected, since nothing was there
space:select{1}
@@ -208,6 +209,7 @@ space:delete(0)
...
space:delete(4294967295)
---
+- null
...
box.space.test:drop()
---
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
@ 2018-04-23 16:42 ` Vladimir Davydov
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
2018-05-08 15:49 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 1 reply; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:42 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:14PM +0300, Vladislav Shpilevoy wrote:
> Netbox has a table 'method_codec' that is used to encode a
> request by a method name. But a response is decoded out of codec.
> It leads to
> 1) decoding into Lua tables before decoding into tuples where
> needed - it is double decoding and produces a lot of garbage;
> 2) each method contains hacks like one_tuple(), or single tuple
> check.
>
> These things can not be fixed with no real codec instead of
> encoder only.
>
> Also global table with decoders is needed for #3107, where
> a request could be sent async with no fiber blocking. An async
> response when received already does not have a call context - it
> has only method name.
>
> Needed for #3107
> ---
> src/box/lua/net_box.lua | 116 +++++++++++++++++++++++++++++-------------------
> test/box/net.box.result | 14 ++++++
> test/box/sql.result | 2 +
> 3 files changed, 87 insertions(+), 45 deletions(-)
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 4ed2b375d..3868cdf1c 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
>
> -- utility tables
> local is_final_state = {closed = 1, error = 1}
> -local method_codec = {
> +
> +local function decode_nil(...) end
> +local function decode_nothing(...) return ... end
decode_nop, may be?
> +local function decode_one_tuple(response)
> + if response[1] then
> + return box.tuple.new(response[1])
> + end
> +end
> +local function decode_single_tuple(response)
> + if response[2] then
> + return nil, box.error.MORE_THAN_ONE_TUPLE
> + end
> + if response[1] then
> + return box.tuple.new(response[1])
> + end
> +end
Do we really need this MORE_THAN_ONE_TUPLE error? Can't we use
decode_one_tuple in 'get' as well? I ask, because having two methods
called decode_one_tuple and decode_single_tuple is confusing IMO -
it's unclear what's the difference between them.
> +local function decode_select(response)
> + setmetatable(response, sequence_mt)
> + for i, v in pairs(response) do
> + response[i] = box.tuple.new(v)
> + end
> + return response
> +end
> +local function decode_count(response)
> + return response[1]
> +end
> +
> +local method_encoder = {
> ping = internal.encode_ping,
> call_16 = internal.encode_call_16,
> call_17 = internal.encode_call,
> @@ -61,6 +88,10 @@ local method_codec = {
> update = internal.encode_update,
> upsert = internal.encode_upsert,
> select = internal.encode_select,
> + get = internal.encode_select,
> + min = internal.encode_select,
> + max = internal.encode_select,
> + count = internal.encode_call,
> -- inject raw data into connection, used by console and tests
> inject = function(buf, id, schema_version, bytes)
> local ptr = buf:reserve(#bytes)
> @@ -69,6 +100,24 @@ local method_codec = {
> end
> }
>
> +local method_decoder = {
> + ping = decode_nil,
> + call_16 = decode_select,
> + call_17 = decode_nothing,
> + eval = decode_nothing,
> + insert = decode_one_tuple,
> + replace = decode_one_tuple,
> + delete = decode_one_tuple,
> + update = decode_one_tuple,
> + upsert = decode_nil,
> + select = decode_select,
> + get = decode_single_tuple,
> + min = decode_single_tuple,
> + max = decode_single_tuple,
> + count = decode_count,
> + inject = decode_nothing,
> +}
> +
> local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
>
> --
> @@ -1144,9 +1175,8 @@ index_metatable = function(remote)
> if opts and opts.buffer then
> error("index:min() doesn't support `buffer` argument")
> end
> - local res = remote:_request('select', opts, self.space.id, self.id,
> - box.index.GE, 0, 1, key)
> - return one_tuple(res)
> + return remote:_request('get', opts, self.space.id, self.id,
Should be 'min'?
> + box.index.GE, 0, 1, key)
> end
>
> function methods:max(key, opts)
> @@ -1154,9 +1184,8 @@ index_metatable = function(remote)
> if opts and opts.buffer then
> error("index:max() doesn't support `buffer` argument")
> end
> - local res = remote:_request('select', opts, self.space.id, self.id,
> - box.index.LE, 0, 1, key)
> - return one_tuple(res)
> + return remote:_request('get', opts, self.space.id, self.id,
Should be 'max'?
> + box.index.LE, 0, 1, key)
> end
>
> function methods:count(key, opts)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-23 16:42 ` Vladimir Davydov
@ 2018-04-23 18:59 ` Vladislav Shpilevoy
2018-04-24 13:16 ` Vladimir Davydov
0 siblings, 1 reply; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 18:59 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
Hello. Thanks for review! See the diff at the end of letter.
>> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
>> index 4ed2b375d..3868cdf1c 100644
>> --- a/src/box/lua/net_box.lua
>> +++ b/src/box/lua/net_box.lua
>> @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
>>
>> -- utility tables
>> local is_final_state = {closed = 1, error = 1}
>> -local method_codec = {
>> +
>> +local function decode_nil(...) end
>
>> +local function decode_nothing(...) return ... end
>
> decode_nop, may be?
Done.
>
>> +local function decode_one_tuple(response)
>> + if response[1] then
>> + return box.tuple.new(response[1])
>> + end
>> +end
>> +local function decode_single_tuple(response)
>> + if response[2] then
>> + return nil, box.error.MORE_THAN_ONE_TUPLE
>> + end
>> + if response[1] then
>> + return box.tuple.new(response[1])
>> + end
>> +end
>
> Do we really need this MORE_THAN_ONE_TUPLE error? Can't we use
> decode_one_tuple in 'get' as well? I ask, because having two methods
> called decode_one_tuple and decode_single_tuple is confusing IMO -
> it's unclear what's the difference between them.
I agree with you, it looks ugly, but IProto has no 'get' method, so
we must emulate it via select(limit = 2) + check that a tuple is
only one. I renamed decode_single to decode_unique. Maybe it looks
not so bad.
>
>> +local function decode_select(response)
>> + setmetatable(response, sequence_mt)
>> + for i, v in pairs(response) do
>> + response[i] = box.tuple.new(v)
>> + end
>> + return response
>> +end
>> +local function decode_count(response)
>> + return response[1]
>> +end
>> +
>> +local method_encoder = {
>> ping = internal.encode_ping,
>> call_16 = internal.encode_call_16,
>> call_17 = internal.encode_call,
>> @@ -61,6 +88,10 @@ local method_codec = {
>> update = internal.encode_update,
>> upsert = internal.encode_upsert,
>> select = internal.encode_select,
>> + get = internal.encode_select,
>> + min = internal.encode_select,
>> + max = internal.encode_select,
>> + count = internal.encode_call,
>> -- inject raw data into connection, used by console and tests
>> inject = function(buf, id, schema_version, bytes)
>> local ptr = buf:reserve(#bytes)
>> @@ -69,6 +100,24 @@ local method_codec = {
>> end
>> }
>>
>> +local method_decoder = {
>> + ping = decode_nil,
>> + call_16 = decode_select,
>> + call_17 = decode_nothing,
>> + eval = decode_nothing,
>> + insert = decode_one_tuple,
>> + replace = decode_one_tuple,
>> + delete = decode_one_tuple,
>> + update = decode_one_tuple,
>> + upsert = decode_nil,
>> + select = decode_select,
>> + get = decode_single_tuple,
>> + min = decode_single_tuple,
>> + max = decode_single_tuple,
>> + count = decode_count,
>> + inject = decode_nothing,
>> +}
>> +
>> local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
>>
>> --
>
>> @@ -1144,9 +1175,8 @@ index_metatable = function(remote)
>> if opts and opts.buffer then
>> error("index:min() doesn't support `buffer` argument")
>> end
>> - local res = remote:_request('select', opts, self.space.id, self.id,
>> - box.index.GE, 0, 1, key)
>> - return one_tuple(res)
>> + return remote:_request('get', opts, self.space.id, self.id,
>
> Should be 'min'?
Yes, did not notice that. Fixed on the branch.
>
>> + box.index.GE, 0, 1, key)
>> end
>>
>> function methods:max(key, opts)
>> @@ -1154,9 +1184,8 @@ index_metatable = function(remote)
>> if opts and opts.buffer then
>> error("index:max() doesn't support `buffer` argument")
>> end
>> - local res = remote:_request('select', opts, self.space.id, self.id,
>> - box.index.LE, 0, 1, key)
>> - return one_tuple(res)
>> + return remote:_request('get', opts, self.space.id, self.id,
>
> Should be 'max'?
Same, you are right. Fixed.
See the diff:
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3868cdf1c..53f301356 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -52,13 +52,13 @@ local E_PROC_LUA = box.error.PROC_LUA
local is_final_state = {closed = 1, error = 1}
local function decode_nil(...) end
-local function decode_nothing(...) return ... end
+local function decode_nop(...) return ... end
local function decode_one_tuple(response)
if response[1] then
return box.tuple.new(response[1])
end
end
-local function decode_single_tuple(response)
+local function decode_unique_tuple(response)
if response[2] then
return nil, box.error.MORE_THAN_ONE_TUPLE
end
@@ -103,19 +103,19 @@ local method_encoder = {
local method_decoder = {
ping = decode_nil,
call_16 = decode_select,
- call_17 = decode_nothing,
- eval = decode_nothing,
+ call_17 = decode_nop,
+ eval = decode_nop,
insert = decode_one_tuple,
replace = decode_one_tuple,
delete = decode_one_tuple,
update = decode_one_tuple,
upsert = decode_nil,
select = decode_select,
- get = decode_single_tuple,
- min = decode_single_tuple,
- max = decode_single_tuple,
+ get = decode_unique_tuple,
+ min = decode_unique_tuple,
+ max = decode_unique_tuple,
count = decode_count,
- inject = decode_nothing,
+ inject = decode_nop,
}
local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
@@ -1175,7 +1175,7 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:min() doesn't support `buffer` argument")
end
- return remote:_request('get', opts, self.space.id, self.id,
+ return remote:_request('min', opts, self.space.id, self.id,
box.index.GE, 0, 1, key)
end
@@ -1184,7 +1184,7 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:max() doesn't support `buffer` argument")
end
- return remote:_request('get', opts, self.space.id, self.id,
+ return remote:_request('max', opts, self.space.id, self.id,
box.index.LE, 0, 1, key)
end
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
@ 2018-04-24 13:16 ` Vladimir Davydov
0 siblings, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-24 13:16 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
Ack
On Mon, Apr 23, 2018 at 09:59:13PM +0300, Vladislav Shpilevoy wrote:
> Hello. Thanks for review! See the diff at the end of letter.
> > > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> > > index 4ed2b375d..3868cdf1c 100644
> > > --- a/src/box/lua/net_box.lua
> > > +++ b/src/box/lua/net_box.lua
> > > @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
> > > -- utility tables
> > > local is_final_state = {closed = 1, error = 1}
> > > -local method_codec = {
> > > +
> > > +local function decode_nil(...) end
> >
> > > +local function decode_nothing(...) return ... end
> >
> > decode_nop, may be?
>
> Done.
>
> >
> > > +local function decode_one_tuple(response)
> > > + if response[1] then
> > > + return box.tuple.new(response[1])
> > > + end
> > > +end
> > > +local function decode_single_tuple(response)
> > > + if response[2] then
> > > + return nil, box.error.MORE_THAN_ONE_TUPLE
> > > + end
> > > + if response[1] then
> > > + return box.tuple.new(response[1])
> > > + end
> > > +end
> >
> > Do we really need this MORE_THAN_ONE_TUPLE error? Can't we use
> > decode_one_tuple in 'get' as well? I ask, because having two methods
> > called decode_one_tuple and decode_single_tuple is confusing IMO -
> > it's unclear what's the difference between them.
>
> I agree with you, it looks ugly, but IProto has no 'get' method, so
> we must emulate it via select(limit = 2) + check that a tuple is
> only one. I renamed decode_single to decode_unique. Maybe it looks
> not so bad.
>
> >
> > > +local function decode_select(response)
> > > + setmetatable(response, sequence_mt)
> > > + for i, v in pairs(response) do
> > > + response[i] = box.tuple.new(v)
> > > + end
> > > + return response
> > > +end
> > > +local function decode_count(response)
> > > + return response[1]
> > > +end
> > > +
> > > +local method_encoder = {
> > > ping = internal.encode_ping,
> > > call_16 = internal.encode_call_16,
> > > call_17 = internal.encode_call,
> > > @@ -61,6 +88,10 @@ local method_codec = {
> > > update = internal.encode_update,
> > > upsert = internal.encode_upsert,
> > > select = internal.encode_select,
> > > + get = internal.encode_select,
> > > + min = internal.encode_select,
> > > + max = internal.encode_select,
> > > + count = internal.encode_call,
> > > -- inject raw data into connection, used by console and tests
> > > inject = function(buf, id, schema_version, bytes)
> > > local ptr = buf:reserve(#bytes)
> > > @@ -69,6 +100,24 @@ local method_codec = {
> > > end
> > > }
> > > +local method_decoder = {
> > > + ping = decode_nil,
> > > + call_16 = decode_select,
> > > + call_17 = decode_nothing,
> > > + eval = decode_nothing,
> > > + insert = decode_one_tuple,
> > > + replace = decode_one_tuple,
> > > + delete = decode_one_tuple,
> > > + update = decode_one_tuple,
> > > + upsert = decode_nil,
> > > + select = decode_select,
> > > + get = decode_single_tuple,
> > > + min = decode_single_tuple,
> > > + max = decode_single_tuple,
> > > + count = decode_count,
> > > + inject = decode_nothing,
> > > +}
> > > +
> > > local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
> > > --
> >
> > > @@ -1144,9 +1175,8 @@ index_metatable = function(remote)
> > > if opts and opts.buffer then
> > > error("index:min() doesn't support `buffer` argument")
> > > end
> > > - local res = remote:_request('select', opts, self.space.id, self.id,
> > > - box.index.GE, 0, 1, key)
> > > - return one_tuple(res)
> > > + return remote:_request('get', opts, self.space.id, self.id,
> >
> > Should be 'min'?
>
> Yes, did not notice that. Fixed on the branch.
>
> >
> > > + box.index.GE, 0, 1, key)
> > > end
> > > function methods:max(key, opts)
> > > @@ -1154,9 +1184,8 @@ index_metatable = function(remote)
> > > if opts and opts.buffer then
> > > error("index:max() doesn't support `buffer` argument")
> > > end
> > > - local res = remote:_request('select', opts, self.space.id, self.id,
> > > - box.index.LE, 0, 1, key)
> > > - return one_tuple(res)
> > > + return remote:_request('get', opts, self.space.id, self.id,
> >
> > Should be 'max'?
>
> Same, you are right. Fixed.
>
> See the diff:
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 3868cdf1c..53f301356 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -52,13 +52,13 @@ local E_PROC_LUA = box.error.PROC_LUA
> local is_final_state = {closed = 1, error = 1}
> local function decode_nil(...) end
> -local function decode_nothing(...) return ... end
> +local function decode_nop(...) return ... end
> local function decode_one_tuple(response)
> if response[1] then
> return box.tuple.new(response[1])
> end
> end
> -local function decode_single_tuple(response)
> +local function decode_unique_tuple(response)
> if response[2] then
> return nil, box.error.MORE_THAN_ONE_TUPLE
> end
> @@ -103,19 +103,19 @@ local method_encoder = {
> local method_decoder = {
> ping = decode_nil,
> call_16 = decode_select,
> - call_17 = decode_nothing,
> - eval = decode_nothing,
> + call_17 = decode_nop,
> + eval = decode_nop,
> insert = decode_one_tuple,
> replace = decode_one_tuple,
> delete = decode_one_tuple,
> update = decode_one_tuple,
> upsert = decode_nil,
> select = decode_select,
> - get = decode_single_tuple,
> - min = decode_single_tuple,
> - max = decode_single_tuple,
> + get = decode_unique_tuple,
> + min = decode_unique_tuple,
> + max = decode_unique_tuple,
> count = decode_count,
> - inject = decode_nothing,
> + inject = decode_nop,
> }
> local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
> @@ -1175,7 +1175,7 @@ index_metatable = function(remote)
> if opts and opts.buffer then
> error("index:min() doesn't support `buffer` argument")
> end
> - return remote:_request('get', opts, self.space.id, self.id,
> + return remote:_request('min', opts, self.space.id, self.id,
> box.index.GE, 0, 1, key)
> end
> @@ -1184,7 +1184,7 @@ index_metatable = function(remote)
> if opts and opts.buffer then
> error("index:max() doesn't support `buffer` argument")
> end
> - return remote:_request('get', opts, self.space.id, self.id,
> + return remote:_request('max', opts, self.space.id, self.id,
> box.index.LE, 0, 1, key)
> end
>
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
2018-04-23 16:42 ` Vladimir Davydov
@ 2018-05-08 15:49 ` Konstantin Osipov
2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy
1 sibling, 1 reply; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:49 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> Netbox has a table 'method_codec' that is used to encode a
> request by a method name. But a response is decoded out of codec.
> It leads to
> 1) decoding into Lua tables before decoding into tuples where
> needed - it is double decoding and produces a lot of garbage;
> 2) each method contains hacks like one_tuple(), or single tuple
> check.
>
> These things can not be fixed with no real codec instead of
> encoder only.
>
> Also global table with decoders is needed for #3107, where
> a request could be sent async with no fiber blocking. An async
> response when received already does not have a call context - it
> has only method name.
>
> Needed for #3107
> ---
> src/box/lua/net_box.lua | 116 +++++++++++++++++++++++++++++-------------------
> test/box/net.box.result | 14 ++++++
> test/box/sql.result | 2 +
> 3 files changed, 87 insertions(+), 45 deletions(-)
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 4ed2b375d..3868cdf1c 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
>
> -- utility tables
> local is_final_state = {closed = 1, error = 1}
> -local method_codec = {
> +
> +local function decode_nil(...) end
> +local function decode_nothing(...) return ... end
decode_nothing -> decode_nop
> +local function decode_one_tuple(response)
> + if response[1] then
> + return box.tuple.new(response[1])
> + end
> +end
decode_one_tuple -> decode_tuple
Why do you need the if () guard?
> +local function decode_single_tuple(response)
> + if response[2] then
> + return nil, box.error.MORE_THAN_ONE_TUPLE
> + end
> + if response[1] then
> + return box.tuple.new(response[1])
> + end
> +end
decode_single_tuple -> decode_get()
> +local function decode_select(response)
> + setmetatable(response, sequence_mt)
> + for i, v in pairs(response) do
> + response[i] = box.tuple.new(v)
> + end
> + return response
> +end
Why are you using pairs() rather than ipairs()?
How does this avoid double decode as you claim in changeset
comment?
> +local function decode_count(response)
> + return response[1]
> +end
> +
> @@ -336,7 +385,10 @@ local function create_transport(host, port, user, password, callback,
> -- Decode xrow.body[DATA] to Lua objects
> body, body_end_check = decode(body_rpos)
> assert(body_end == body_end_check, "invalid xrow length")
> - return res -- the length of xrow.body
> - elseif not err then
> - setmetatable(res, sequence_mt)
> - local postproc = method ~= 'eval' and method ~= 'call_17'
> - if postproc then
> - local tnew = box.tuple.new
You removed this local variable from the decoder - it was here
for a reason. Please put it back in your implementation of decoder.
> diff --git a/test/box/net.box.result b/test/box/net.box.result
> index cf7b27f0b..6a3713fc0 100644
> --- a/test/box/net.box.result
> +++ b/test/box/net.box.result
> @@ -416,9 +416,11 @@ cn.space.net_box_test_space:select({234}, { iterator = 'LT' })
> ...
> cn.space.net_box_test_space:update({1}, { { '+', 2, 2 } })
> ---
> +- null
net.box calling convention should be the same as local box, if a
tuple is not found, it should return nothing, not null.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 4/8] netbox: extend codec with 'decode' methods
2018-05-08 15:49 ` [tarantool-patches] " Konstantin Osipov
@ 2018-05-08 17:24 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 17:24 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov
Hello. Thanks for review.
On 08/05/2018 18:49, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
>> Netbox has a table 'method_codec' that is used to encode a
>> request by a method name. But a response is decoded out of codec.
>> It leads to
>> 1) decoding into Lua tables before decoding into tuples where
>> needed - it is double decoding and produces a lot of garbage;
>> 2) each method contains hacks like one_tuple(), or single tuple
>> check.
>>
>> These things can not be fixed with no real codec instead of
>> encoder only.
>>
>> Also global table with decoders is needed for #3107, where
>> a request could be sent async with no fiber blocking. An async
>> response when received already does not have a call context - it
>> has only method name.
>>
>> Needed for #3107
>> ---
>> src/box/lua/net_box.lua | 116 +++++++++++++++++++++++++++++-------------------
>> test/box/net.box.result | 14 ++++++
>> test/box/sql.result | 2 +
>> 3 files changed, 87 insertions(+), 45 deletions(-)
>>
>> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
>> index 4ed2b375d..3868cdf1c 100644
>> --- a/src/box/lua/net_box.lua
>> +++ b/src/box/lua/net_box.lua
>> @@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
>>
>> -- utility tables
>> local is_final_state = {closed = 1, error = 1}
>> -local method_codec = {
>> +
>> +local function decode_nil(...) end
>> +local function decode_nothing(...) return ... end
>
> decode_nothing -> decode_nop
You reviewed the old patch. Is is fixed already on Vova request.
>
>> +local function decode_one_tuple(response)
>> + if response[1] then
>> + return box.tuple.new(response[1])
>> + end
>> +end
>
> decode_one_tuple -> decode_tuple
Ok.
@@ -53,7 +53,7 @@ local is_final_state = {closed = 1, error = 1}
local function decode_nil(...) end
local function decode_nop(...) return ... end
-local function decode_one_tuple(response)
+local function decode_tuple(response)
if response[1] then
return box.tuple.new(response[1])
end
@@ -105,10 +105,10 @@ local method_decoder = {
call_16 = decode_select,
call_17 = decode_nop,
eval = decode_nop,
- insert = decode_one_tuple,
- replace = decode_one_tuple,
- delete = decode_one_tuple,
- update = decode_one_tuple,
+ insert = decode_tuple,
+ replace = decode_tuple,
+ delete = decode_tuple,
+ update = decode_tuple,
upsert = decode_nil,
select = decode_select,
get = decode_unique_tuple,
>
> Why do you need the if () guard?
Body can be empty - in such a case there is no sense to allocate
cdata.
>
>> +local function decode_single_tuple(response)
>> + if response[2] then
>> + return nil, box.error.MORE_THAN_ONE_TUPLE
>> + end
>> + if response[1] then
>> + return box.tuple.new(response[1])
>> + end
>> +end
>
> decode_single_tuple -> decode_get()
Ok.
@@ -58,7 +58,7 @@ local function decode_tuple(response)
return box.tuple.new(response[1])
end
end
-local function decode_unique_tuple(response)
+local function decode_get(response)
if response[2] then
return nil, box.error.MORE_THAN_ONE_TUPLE
end
@@ -111,9 +111,9 @@ local method_decoder = {
update = decode_tuple,
upsert = decode_nil,
select = decode_select,
- get = decode_unique_tuple,
- min = decode_unique_tuple,
- max = decode_unique_tuple,
+ get = decode_get,
+ min = decode_get,
+ max = decode_get,
count = decode_count,
inject = decode_nop,
>
>> +local function decode_select(response)
>> + setmetatable(response, sequence_mt)
>> + for i, v in pairs(response) do
>> + response[i] = box.tuple.new(v)
>> + end
>> + return response
>> +end
>
> Why are you using pairs() rather than ipairs()?
Pairs is faster.
https://stackoverflow.com/questions/8955085/should-i-use-ipairs-or-a-for-loop
>
> How does this avoid double decode as you claim in changeset
> comment?
In no way. I did not said, that I fix it here. I just note, that it is a
problem, solution of which requires separate decoders.
>
>> +local function decode_count(response)
>> + return response[1]
>> +end
>> +
>> @@ -336,7 +385,10 @@ local function create_transport(host, port, user, password, callback,
>> -- Decode xrow.body[DATA] to Lua objects
>> body, body_end_check = decode(body_rpos)
>> assert(body_end == body_end_check, "invalid xrow length")
>> - return res -- the length of xrow.body
>> - elseif not err then
>> - setmetatable(res, sequence_mt)
>> - local postproc = method ~= 'eval' and method ~= 'call_17'
>> - if postproc then
>> - local tnew = box.tuple.new
>
> You removed this local variable from the decoder - it was here
> for a reason. Please put it back in your implementation of decoder.
Ok.
@@ -68,8 +68,9 @@ local function decode_unique_tuple(response)
end
local function decode_select(response)
setmetatable(response, sequence_mt)
+ local tnew box.tuple.new
for i, v in pairs(response) do
- response[i] = box.tuple.new(v)
+ response[i] = tnew(v)
end
return response
end
>
>> diff --git a/test/box/net.box.result b/test/box/net.box.result
>> index cf7b27f0b..6a3713fc0 100644
>> --- a/test/box/net.box.result
>> +++ b/test/box/net.box.result
>> @@ -416,9 +416,11 @@ cn.space.net_box_test_space:select({234}, { iterator = 'LT' })
>> ...
>> cn.space.net_box_test_space:update({1}, { { '+', 2, 2 } })
>> ---
>> +- null
>
> net.box calling convention should be the same as local box, if a
> tuple is not found, it should return nothing, not null.
>
Ok.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 0e2485f66..f6eef56fb 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -68,7 +68,7 @@ local function decode_get(response)
end
local function decode_select(response)
setmetatable(response, sequence_mt)
- local tnew box.tuple.new
+ local tnew = box.tuple.new
for i, v in pairs(response) do
response[i] = tnew(v)
end
@@ -1099,6 +1099,12 @@ function console_methods:eval(line, timeout)
return res[1] or res
end
+local function nothing_or_data(value)
+ if value ~= nil then
+ return value
+ end
+end
+
space_metatable = function(remote)
local methods = {}
@@ -1129,7 +1135,8 @@ space_metatable = function(remote)
function methods:upsert(key, oplist, opts)
check_space_arg(self, 'upsert')
- return remote:_request('upsert', opts, self.id, key, oplist)
+ return nothing_or_data(remote:_request('upsert', opts, self.id, key,
+ oplist))
end
function methods:get(key, opts)
@@ -1167,8 +1174,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:get() doesn't support `buffer` argument")
end
- return remote:_request('get', opts, self.space.id, self.id,
- box.index.EQ, 0, 2, key)
+ return nothing_or_data(remote:_request('get', opts, self.space.id,
+ self.id, box.index.EQ, 0, 2, key))
end
function methods:min(key, opts)
@@ -1176,8 +1183,9 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:min() doesn't support `buffer` argument")
end
- return remote:_request('min', opts, self.space.id, self.id,
- box.index.GE, 0, 1, key)
+ return nothing_or_data(remote:_request('min', opts, self.space.id,
+ self.id, box.index.GE, 0, 1,
+ key))
end
function methods:max(key, opts)
@@ -1185,8 +1193,9 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:max() doesn't support `buffer` argument")
end
- return remote:_request('max', opts, self.space.id, self.id,
- box.index.LE, 0, 1, key)
+ return nothing_or_data(remote:_request('max', opts, self.space.id,
+ self.id, box.index.LE, 0, 1,
+ key))
end
function methods:count(key, opts)
@@ -1201,13 +1210,14 @@ index_metatable = function(remote)
function methods:delete(key, opts)
check_index_arg(self, 'delete')
- return remote:_request('delete', opts, self.space.id, self.id, key)
+ return nothing_or_data(remote:_request('delete', opts, self.space.id,
+ self.id, key))
end
function methods:update(key, oplist, opts)
check_index_arg(self, 'update')
- return remote:_request('update', opts, self.space.id, self.id, key,
- oplist)
+ return nothing_or_data(remote:_request('update', opts, self.space.id,
+ self.id, key, oplist))
end
return { __index = methods, __metatable = false }
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 6a3713fc0..4fbc70ec6 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -416,11 +416,9 @@ cn.space.net_box_test_space:select({234}, { iterator = 'LT' })
...
cn.space.net_box_test_space:update({1}, { { '+', 2, 2 } })
---
-- null
...
cn.space.net_box_test_space:delete{1}
---
-- null
...
cn.space.net_box_test_space:delete{2}
---
@@ -428,7 +426,6 @@ cn.space.net_box_test_space:delete{2}
...
cn.space.net_box_test_space:delete{2}
---
-- null
...
-- test one-based indexing in splice operation (see update.test.lua)
cn.space.net_box_test_space:replace({10, 'abcde'})
@@ -757,15 +754,12 @@ remote_space:upsert({3}, {}, { timeout = 1e-9 })
...
remote_space:upsert({4}, {})
---
-- null
...
remote_space:upsert({5}, {}, { timeout = 1.00 })
---
-- null
...
remote_space:upsert({3}, {})
---
-- null
...
remote_space:update({3}, {}, { timeout = 1e-9 })
---
@@ -987,15 +981,12 @@ _ = remote_pk:delete({5})
...
remote_space:get(0)
---
-- null
...
remote_space:get(1)
---
-- null
...
remote_space:get(2)
---
-- null
...
remote_space = nil
---
@@ -1327,7 +1318,6 @@ c.space.test:select{}
...
c.space.test:upsert({1, 2, 'nothing'}, {{'+', 2, 1}}) -- common update
---
-- null
...
c.space.test:select{}
---
@@ -1335,7 +1325,6 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'something'}, {{'+', 2, 1}}) -- insert
---
-- null
...
c.space.test:select{}
---
@@ -1344,7 +1333,6 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'nothing'}, {{'+', 3, 100500}}) -- wrong operation
---
-- null
...
c.space.test:select{}
---
diff --git a/test/box/sql.result b/test/box/sql.result
index 95f8da7dd..11a698850 100644
--- a/test/box/sql.result
+++ b/test/box/sql.result
@@ -105,7 +105,6 @@ space:select{1}
-- xxx: update comes through, returns 0 rows affected
space:update(1, {{'=', 2, 'I am a new tuple'}})
---
-- null
...
-- nothing is selected, since nothing was there
space:select{1}
@@ -209,7 +208,6 @@ space:delete(0)
...
space:delete(4294967295)
---
-- null
...
box.space.test:drop()
---
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 5/8] test: fix unstable test
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (3 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-22 5:32 ` [tarantool-patches] " Kirill Yukhin
2018-05-08 15:50 ` Konstantin Osipov
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (2 subsequent siblings)
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
---
test/box/errinj.result | 3 +++
test/box/errinj.test.lua | 1 +
2 files changed, 4 insertions(+)
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 958f82dd9..5b4bc23a3 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -1136,6 +1136,9 @@ errinj.set("ERRINJ_WAL_DELAY", false)
---
- ok
...
+while ok == nil do fiber.sleep(0.01) end
+---
+...
ok, err
---
- true
diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
index a3dde8458..e1460d1b6 100644
--- a/test/box/errinj.test.lua
+++ b/test/box/errinj.test.lua
@@ -385,6 +385,7 @@ end)
test_run:cmd('setopt delimiter ""');
cn.space.test:get{1}
errinj.set("ERRINJ_WAL_DELAY", false)
+while ok == nil do fiber.sleep(0.01) end
ok, err
cn:close()
s:drop()
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 5/8] test: fix unstable test
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
@ 2018-04-22 5:32 ` Kirill Yukhin
2018-05-08 15:50 ` Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Kirill Yukhin @ 2018-04-22 5:32 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Hello Vlad,
On 16 апр 21:39, Vladislav Shpilevoy wrote:
> ---
> test/box/errinj.result | 3 +++
> test/box/errinj.test.lua | 1 +
> 2 files changed, 4 insertions(+)
I am so tired of this flaky that I've decided to commit this patch
out-of-cycle. Committed to 1.10 and 2.1 branches.
Could you pls evict it from your patch-set?
--
Rregards, Kirill Yukhin
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 5/8] test: fix unstable test
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
2018-04-22 5:32 ` [tarantool-patches] " Kirill Yukhin
@ 2018-05-08 15:50 ` Konstantin Osipov
1 sibling, 0 replies; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 15:50 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> ---
> test/box/errinj.result | 3 +++
> test/box/errinj.test.lua | 1 +
> 2 files changed, 4 insertions(+)
This has been pushed already, should not have been in this patch
set in the first place.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 6/8] netbox: introduce fiber-async API
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (4 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 12:31 ` [tarantool-patches] " Alexander Turenko
2018-04-23 16:44 ` Vladimir Davydov
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
7 siblings, 2 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Now any netbox call blocks a caller-fiber until a result is read
from a socket, or time is out. To use it asynchronously it is
necessary to create a fiber per request. Sometimes it is
unwanted - for example if RPS is very high (for example, about
100k), and latency is about 1 second. Or when it is neccessary
to send multiple requests in paralles and then collect responses
(map-reduce).
The patch introduces a new option for all netbox requests:
is_async. With this option any called netbox method returns
immediately (but still yields for a moment) a 'future' object.
By a future object a user can check if the request is finalized,
get a result or error, wait for a timeout, discard a response.
Example of is_async usage:
future = conn:call(func, {params}, {..., is_async = true})
-- Do some work ...
if not future.is_ready() then
result, err = future:wait_result(timeout)
end
-- Or:
result, error = future:result()
A future:result() and :wait_result() returns either an error or
a response in the same format, as the sync versions of the called
methods.
Part of #3107
---
src/box/lua/net_box.lua | 159 ++++++++++++--
test/box/net.box.result | 519 +++++++++++++++++++++++++++++++++++++++++++++-
test/box/net.box.test.lua | 186 ++++++++++++++++-
3 files changed, 836 insertions(+), 28 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3868cdf1c..96f528963 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback,
local last_error
local state_cond = fiber.cond() -- signaled when the state changes
- -- requests: requests currently 'in flight', keyed by a request id;
- -- value refs are weak hence if a client dies unexpectedly,
- -- GC cleans the mess. Client submits a request and waits on state_cond.
- -- If the reponse arrives within the timeout, the worker wakes
- -- client fiber explicitly. Otherwize, wait on state_cond completes and
- -- the client reports E_TIMEOUT.
+ -- Async requests currently 'in flight', keyed by a request
+ -- id. Value refs are weak hence if a client dies
+ -- unexpectedly, GC cleans the mess. Client either submits a
+ -- request and waits on state_cond, OR makes an async request
+ -- and does not block until a response is received. If the
+ -- request is not async and the reponse arrives within the
+ -- timeout, the worker wakes client fiber explicitly.
+ -- Otherwize, wait on state_cond completes and the client
+ -- reports E_TIMEOUT.
+ -- Async request can not be timed out completely. Instead a
+ -- user must decide when he does not want to wait for
+ -- response anymore.
local requests = setmetatable({}, { __mode = 'v' })
local next_request_id = 1
@@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback,
local send_buf = buffer.ibuf(buffer.READAHEAD)
local recv_buf = buffer.ibuf(buffer.READAHEAD)
+ local function wakeup_client(client)
+ if client and client:status() ~= 'dead' then
+ client:wakeup()
+ end
+ end
+
+ --
+ -- Async request metamethods.
+ --
+ local request_index = {}
+ --
+ -- When an async request is finalized (with ok or error - no
+ -- matter), its 'id' field is nullified by a response
+ -- dispatcher.
+ --
+ function request_index:is_ready()
+ return self.id == nil or worker_fiber == nil
+ end
+ --
+ -- When a request is finished, a result can be got from a
+ -- future object anytime.
+ -- @retval result, nil Success, the response is returned.
+ -- @retval nil, error Error occured.
+ --
+ function request_index:result()
+ if self.errno then
+ return nil, box.error.new({code = self.errno,
+ reason = self.response})
+ elseif not self.id then
+ return self.response
+ elseif not worker_fiber then
+ return nil, box.error.new(E_NO_CONNECTION)
+ else
+ return nil, box.error.new(box.error.PROC_LUA,
+ 'Response is not ready')
+ end
+ end
+ --
+ -- Wait for a response or error max timeout seconds.
+ -- @param timeout Max seconds to wait.
+ -- @retval result, nil Success, the response is returned.
+ -- @retval nil, error Error occured.
+ --
+ function request_index:wait_result(timeout)
+ if timeout then
+ if type(timeout) ~= 'number' or timeout < 0 then
+ error('Usage: future:wait_result(timeout)')
+ end
+ else
+ timeout = TIMEOUT_INFINITY
+ end
+ if not self:is_ready() then
+ -- When a response is ready before timeout, the
+ -- waiting client is waked up spuriously.
+ local old_client = self.client
+ self.client = fiber.self()
+ while timeout > 0 and not self:is_ready() do
+ local ts = fiber.clock()
+ state_cond:wait(timeout)
+ timeout = timeout - (fiber.clock() - ts)
+ end
+ self.client = old_client
+ if not self:is_ready() then
+ return nil, box.error.new(E_TIMEOUT)
+ end
+ -- It is possible that multiple fibers are waiting for
+ -- a result. In such a case a first, who got it, must
+ -- wakeup the previous waiting client. This one wakes
+ -- up another. Another wakes up third one, etc.
+ wakeup_client(old_client)
+ end
+ return self:result()
+ end
+ --
+ -- Make a connection forget about the response. When it will
+ -- be received, it will be ignored.
+ --
+ function request_index:discard()
+ if self.id then
+ requests[self.id] = nil
+ self.id = nil
+ self.errno = box.error.PROC_LUA
+ self.response = 'Response is discarded'
+ end
+ end
+
+ local request_mt = { __index = request_index }
+
-- STATE SWITCHING --
local function set_state(new_state, new_errno, new_error)
state = new_state
@@ -236,6 +330,7 @@ local function create_transport(host, port, user, password, callback,
state_cond:broadcast()
if state == 'error' or state == 'error_reconnect' then
for _, request in pairs(requests) do
+ request.id = nil
request.errno = new_errno
request.response = new_error
end
@@ -315,12 +410,16 @@ local function create_transport(host, port, user, password, callback,
end
end
- -- REQUEST/RESPONSE --
- local function perform_request(timeout, buffer, method, schema_version, ...)
+ --
+ -- Send a request and do not wait for response.
+ -- @retval nil, error Error occured.
+ -- @retval not nil Future object.
+ --
+ local function perform_async_request(buffer, method, schema_version, ...)
if state ~= 'active' then
- return last_errno or E_NO_CONNECTION, last_error
+ return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
+ reason = last_error})
end
- local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
-- alert worker to notify it of the queued outgoing data;
-- if the buffer wasn't empty, assume the worker was already alerted
if send_buf:size() == 0 then
@@ -329,12 +428,27 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
- local request = table_new(0, 6) -- reserve space for 6 keys
- request.client = fiber_self()
+ local request = setmetatable(table_new(0, 7), request_mt)
request.method = method
request.schema_version = schema_version
request.buffer = buffer
+ request.id = id
requests[id] = request
+ return request
+ end
+
+ --
+ -- Send a request and wait for response.
+ --
+ local function perform_request(timeout, buffer, method, schema_version, ...)
+ local request, err =
+ perform_async_request(buffer, method, schema_version, ...)
+ if not request then
+ return last_errno or E_NO_CONNECTION, last_error
+ end
+ request.client = fiber_self()
+ local id = request.id
+ local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
repeat
local timeout = max(0, deadline - fiber_clock())
if not state_cond:wait(timeout) then
@@ -345,12 +459,6 @@ local function create_transport(host, port, user, password, callback,
return request.errno, request.response
end
- local function wakeup_client(client)
- if client:status() ~= 'dead' then
- client:wakeup()
- end
- end
-
local function dispatch_response_iproto(hdr, body_rpos, body_end)
local id = hdr[IPROTO_SYNC_KEY]
local request = requests[id]
@@ -358,6 +466,7 @@ local function create_transport(host, port, user, password, callback,
return
end
requests[id] = nil
+ request.id = nil
local status = hdr[IPROTO_STATUS_KEY]
local body, body_end_check
@@ -607,7 +716,8 @@ local function create_transport(host, port, user, password, callback,
stop = stop,
start = start,
wait_state = wait_state,
- perform_request = perform_request
+ perform_request = perform_request,
+ perform_async_request = perform_async_request,
}
end
@@ -864,8 +974,12 @@ function remote_methods:wait_connected(timeout)
end
function remote_methods:_request(method, opts, ...)
- local this_fiber = fiber_self()
local transport = self._transport
+ local buffer = opts and opts.buffer
+ if opts and opts.is_async then
+ return transport.perform_async_request(buffer, method, 0, ...)
+ end
+ local this_fiber = fiber_self()
local perform_request = transport.perform_request
local wait_state = transport.wait_state
local deadline = nil
@@ -877,7 +991,6 @@ function remote_methods:_request(method, opts, ...)
-- @deprecated since 1.7.4
deadline = self._deadlines[this_fiber]
end
- local buffer = opts and opts.buffer
local err, res
repeat
local timeout = deadline and max(0, deadline - fiber_clock())
@@ -928,7 +1041,7 @@ function remote_methods:call(func_name, args, opts)
check_call_args(args)
args = args or {}
local res = self:_request('call_17', opts, tostring(func_name), args)
- if type(res) ~= 'table' then
+ if type(res) ~= 'table' or opts and opts.is_async then
return res
end
return unpack(res)
@@ -945,7 +1058,7 @@ function remote_methods:eval(code, args, opts)
check_eval_args(args)
args = args or {}
local res = self:_request('eval', opts, code, args)
- if type(res) ~= 'table' then
+ if type(res) ~= 'table' or opts and opts.is_async then
return res
end
return unpack(res)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 6a3713fc0..aaa421ec6 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -2475,9 +2475,6 @@ box.internal.collation.drop('test')
space:drop()
---
...
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
----
-...
c.state
---
- closed
@@ -2485,3 +2482,519 @@ c.state
c = nil
---
...
+--
+-- gh-3107: fiber-async netbox.
+--
+f = nil
+---
+...
+function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end
+---
+...
+s = box.schema.create_space('test')
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+s:replace{2}
+---
+- [2]
+...
+s:replace{3}
+---
+- [3]
+...
+s:replace{4}
+---
+- [4]
+...
+c = net:connect(box.cfg.listen)
+---
+...
+--
+-- Check long connections, multiple wait_result().
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+future:result()
+---
+- null
+- Response is not ready
+...
+future:is_ready()
+---
+- false
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+ret = future:wait_result(0.01)
+---
+...
+future:is_ready()
+---
+- true
+...
+future:wait_result(0.01)
+---
+- [1, 2, 3]
+...
+ret
+---
+- [1, 2, 3]
+...
+_, err = pcall(future.wait_result, future, true)
+---
+...
+err:find('Usage') ~= nil
+---
+- true
+...
+_, err = pcall(future.wait_result, future, '100')
+---
+...
+err:find('Usage') ~= nil
+---
+- true
+...
+--
+-- Check infinity timeout.
+--
+ret = nil
+---
+...
+_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
+---
+...
+f:wakeup()
+---
+...
+while not ret do fiber.sleep(0.01) end
+---
+...
+ret
+---
+- [1, 2, 3]
+...
+future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
+---
+...
+future:result()
+---
+- null
+- Response is not ready
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+future:wait_result(0.01)
+---
+- [1, 2, 3]
+...
+--
+-- Ensure the request is garbage collected both if is not used and
+-- if is.
+--
+gc_test = setmetatable({}, {__mode = 'v'})
+---
+...
+gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+gc_test.future ~= nil
+---
+- true
+...
+collectgarbage()
+---
+- 0
+...
+gc_test
+---
+- []
+...
+f:wakeup()
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+collectgarbage()
+---
+- 0
+...
+future ~= nil
+---
+- true
+...
+f:wakeup()
+---
+...
+future:wait_result(1000)
+---
+- [1, 2, 3]
+...
+collectgarbage()
+---
+- 0
+...
+future ~= nil
+---
+- true
+...
+gc_test.future = future
+---
+...
+future = nil
+---
+...
+collectgarbage()
+---
+- 0
+...
+gc_test
+---
+- []
+...
+--
+-- Ensure a request can be finalized from non-caller fibers.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+ret = {}
+---
+...
+count = 0
+---
+...
+for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
+---
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+while count ~= 10 do fiber.sleep(0.1) end
+---
+...
+ret
+---
+- - &0 [1, 2, 3]
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+...
+--
+-- Test space methods.
+--
+future = c.space.test:select({1}, {is_async = true})
+---
+...
+ret = future:wait_result(100)
+---
+...
+ret
+---
+- - [1]
+...
+type(ret[1])
+---
+- cdata
+...
+future = c.space.test:insert({5}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5]
+...
+s:get{5}
+---
+- [5]
+...
+future = c.space.test:replace({6}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [6]
+...
+s:get{6}
+---
+- [6]
+...
+future = c.space.test:delete({6}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [6]
+...
+s:get{6}
+---
+...
+future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 5]
+...
+s:get{5}
+---
+- [5, 5]
+...
+future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- null
+...
+s:get{5}
+---
+- [5, 6]
+...
+future = c.space.test:get({5}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 6]
+...
+--
+-- Test index methods.
+--
+future = c.space.test.index.pk:select({1}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- - [1]
+...
+future = c.space.test.index.pk:get({2}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [2]
+...
+future = c.space.test.index.pk:min({}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [1]
+...
+future = c.space.test.index.pk:max({}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 6]
+...
+future = c.space.test.index.pk:count({3}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- 1
+...
+future = c.space.test.index.pk:delete({3}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [3]
+...
+s:get{3}
+---
+...
+future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [4, 6]
+...
+s:get{4}
+---
+- [4, 6]
+...
+--
+-- Test async errors.
+--
+future = c.space.test:insert({1}, {is_async = true})
+---
+...
+future:wait_result()
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+future:result()
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+--
+-- Test discard.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+future:discard()
+---
+...
+f:wakeup()
+---
+...
+future:result()
+---
+- null
+- Response is discarded
+...
+future:wait_result(100)
+---
+- null
+- Response is discarded
+...
+--
+-- Test closed connection.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+c:close()
+---
+...
+future2:wait_result(100)
+---
+- null
+- Connection is not established
+...
+future2:result()
+---
+- null
+- Connection is not established
+...
+future2:discard()
+---
+...
+-- Already successful result must be available.
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+future:result()
+---
+- [1, 2, 3]
+...
+future:is_ready()
+---
+- true
+...
+--
+-- Test reconnect.
+--
+c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+---
+...
+while not c:is_connected() do fiber.sleep(0.01) end
+---
+...
+future:wait_result(100)
+---
+- null
+- Peer closed
+...
+future:result()
+---
+- null
+- Peer closed
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+--
+-- Test raw response getting.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- 10
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- {48: [1, 2, 3]}
+...
+c:close()
+---
+...
+s:drop()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 576b5cfea..82c538fbe 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -1004,8 +1004,190 @@ c.space.test.index.sk.parts
c:close()
box.internal.collation.drop('test')
space:drop()
-
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
c.state
c = nil
+--
+-- gh-3107: fiber-async netbox.
+--
+f = nil
+function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+s:replace{1}
+s:replace{2}
+s:replace{3}
+s:replace{4}
+c = net:connect(box.cfg.listen)
+--
+-- Check long connections, multiple wait_result().
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+future:result()
+future:is_ready()
+future:wait_result(0.01)
+f:wakeup()
+ret = future:wait_result(0.01)
+future:is_ready()
+future:wait_result(0.01)
+ret
+
+_, err = pcall(future.wait_result, future, true)
+err:find('Usage') ~= nil
+_, err = pcall(future.wait_result, future, '100')
+err:find('Usage') ~= nil
+
+--
+-- Check infinity timeout.
+--
+ret = nil
+_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
+f:wakeup()
+while not ret do fiber.sleep(0.01) end
+ret
+
+future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
+future:result()
+future:wait_result(0.01)
+f:wakeup()
+future:wait_result(0.01)
+
+--
+-- Ensure the request is garbage collected both if is not used and
+-- if is.
+--
+gc_test = setmetatable({}, {__mode = 'v'})
+gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
+gc_test.future ~= nil
+collectgarbage()
+gc_test
+f:wakeup()
+
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+collectgarbage()
+future ~= nil
+f:wakeup()
+future:wait_result(1000)
+collectgarbage()
+future ~= nil
+gc_test.future = future
+future = nil
+collectgarbage()
+gc_test
+
+--
+-- Ensure a request can be finalized from non-caller fibers.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+ret = {}
+count = 0
+for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
+future:wait_result(0.01)
+f:wakeup()
+while count ~= 10 do fiber.sleep(0.1) end
+ret
+
+--
+-- Test space methods.
+--
+future = c.space.test:select({1}, {is_async = true})
+ret = future:wait_result(100)
+ret
+type(ret[1])
+future = c.space.test:insert({5}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:replace({6}, {is_async = true})
+future:wait_result(100)
+s:get{6}
+future = c.space.test:delete({6}, {is_async = true})
+future:wait_result(100)
+s:get{6}
+future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:get({5}, {is_async = true})
+future:wait_result(100)
+
+--
+-- Test index methods.
+--
+future = c.space.test.index.pk:select({1}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:get({2}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:min({}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:max({}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:count({3}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:delete({3}, {is_async = true})
+future:wait_result(100)
+s:get{3}
+future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
+future:wait_result(100)
+s:get{4}
+
+--
+-- Test async errors.
+--
+future = c.space.test:insert({1}, {is_async = true})
+future:wait_result()
+future:result()
+
+--
+-- Test discard.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+future:discard()
+f:wakeup()
+future:result()
+future:wait_result(100)
+
+--
+-- Test closed connection.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+f:wakeup()
+future:wait_result(100)
+future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
+c:close()
+future2:wait_result(100)
+future2:result()
+future2:discard()
+-- Already successful result must be available.
+future:wait_result(100)
+future:result()
+future:is_ready()
+
+--
+-- Test reconnect.
+--
+c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+while not c:is_connected() do fiber.sleep(0.01) end
+future:wait_result(100)
+future:result()
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+f:wakeup()
+future:wait_result(100)
+
+--
+-- Test raw response getting.
+--
+ibuf = require('buffer').ibuf()
+future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
+f:wakeup()
+future:wait_result(100)
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
+c:close()
+s:drop()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
@ 2018-04-23 12:31 ` Alexander Turenko
2018-04-23 18:59 ` Vladislav Shpilevoy
2018-04-23 16:44 ` Vladimir Davydov
1 sibling, 1 reply; 32+ messages in thread
From: Alexander Turenko @ 2018-04-23 12:31 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
Hi Vlad!
Minor wording comments are below (Vlad asks to track it here).
WBR, Alexander Turenko.
On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
> Now any netbox call blocks a caller-fiber until a result is read
> from a socket, or time is out. To use it asynchronously it is
> necessary to create a fiber per request. Sometimes it is
> unwanted - for example if RPS is very high (for example, about
> 100k), and latency is about 1 second. Or when it is neccessary
> to send multiple requests in paralles and then collect responses
> (map-reduce).
Paralles -> parallel.
> + function request_index:wait_result(timeout)
> ...
> + if not self:is_ready() then
> + -- When a response is ready before timeout, the
> + -- waiting client is waked up spuriously.
Do you mean prematurely?
^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-23 12:31 ` [tarantool-patches] " Alexander Turenko
@ 2018-04-23 18:59 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 18:59 UTC (permalink / raw)
To: tarantool-patches, Alexander Turenko
Hello. Thanks for review!
On 23/04/2018 15:31, Alexander Turenko wrote:
> Hi Vlad!
>
> Minor wording comments are below (Vlad asks to track it here).
>
> WBR, Alexander Turenko.
>
> On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
>> Now any netbox call blocks a caller-fiber until a result is read
>> from a socket, or time is out. To use it asynchronously it is
>> necessary to create a fiber per request. Sometimes it is
>> unwanted - for example if RPS is very high (for example, about
>> 100k), and latency is about 1 second. Or when it is neccessary
>> to send multiple requests in paralles and then collect responses
>> (map-reduce).
>
> Paralles -> parallel.
>
>> + function request_index:wait_result(timeout)
>> ...
>> + if not self:is_ready() then
>> + -- When a response is ready before timeout, the
>> + -- waiting client is waked up spuriously.
>
> Do you mean prematurely?
Fixed on branch.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 85f31b8e8..e3680006e 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -286,7 +286,7 @@ local function create_transport(host, port, user, password, callback,
end
if not self:is_ready() then
-- When a response is ready before timeout, the
- -- waiting client is waked up spuriously.
+ -- waiting client is waked up prematurely.
local old_client = self.client
self.client = fiber.self()
while timeout > 0 and not self:is_ready() do
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-23 12:31 ` [tarantool-patches] " Alexander Turenko
@ 2018-04-23 16:44 ` Vladimir Davydov
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
1 sibling, 1 reply; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:44 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
> Now any netbox call blocks a caller-fiber until a result is read
> from a socket, or time is out. To use it asynchronously it is
> necessary to create a fiber per request. Sometimes it is
> unwanted - for example if RPS is very high (for example, about
> 100k), and latency is about 1 second. Or when it is neccessary
> to send multiple requests in paralles and then collect responses
> (map-reduce).
>
> The patch introduces a new option for all netbox requests:
> is_async. With this option any called netbox method returns
> immediately (but still yields for a moment) a 'future' object.
>
> By a future object a user can check if the request is finalized,
> get a result or error, wait for a timeout, discard a response.
>
> Example of is_async usage:
> future = conn:call(func, {params}, {..., is_async = true})
> -- Do some work ...
> if not future.is_ready() then
> result, err = future:wait_result(timeout)
> end
> -- Or:
> result, error = future:result()
>
> A future:result() and :wait_result() returns either an error or
> a response in the same format, as the sync versions of the called
> methods.
>
> Part of #3107
> ---
> src/box/lua/net_box.lua | 159 ++++++++++++--
> test/box/net.box.result | 519 +++++++++++++++++++++++++++++++++++++++++++++-
> test/box/net.box.test.lua | 186 ++++++++++++++++-
> 3 files changed, 836 insertions(+), 28 deletions(-)
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 3868cdf1c..96f528963 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback,
> local last_error
> local state_cond = fiber.cond() -- signaled when the state changes
>
> - -- requests: requests currently 'in flight', keyed by a request id;
> - -- value refs are weak hence if a client dies unexpectedly,
> - -- GC cleans the mess. Client submits a request and waits on state_cond.
> - -- If the reponse arrives within the timeout, the worker wakes
> - -- client fiber explicitly. Otherwize, wait on state_cond completes and
> - -- the client reports E_TIMEOUT.
> + -- Async requests currently 'in flight', keyed by a request
> + -- id. Value refs are weak hence if a client dies
> + -- unexpectedly, GC cleans the mess. Client either submits a
> + -- request and waits on state_cond, OR makes an async request
> + -- and does not block until a response is received. If the
> + -- request is not async and the reponse arrives within the
> + -- timeout, the worker wakes client fiber explicitly.
> + -- Otherwize, wait on state_cond completes and the client
> + -- reports E_TIMEOUT.
> + -- Async request can not be timed out completely. Instead a
> + -- user must decide when he does not want to wait for
> + -- response anymore.
> local requests = setmetatable({}, { __mode = 'v' })
> local next_request_id = 1
>
> @@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback,
> local send_buf = buffer.ibuf(buffer.READAHEAD)
> local recv_buf = buffer.ibuf(buffer.READAHEAD)
>
> + local function wakeup_client(client)
> + if client and client:status() ~= 'dead' then
> + client:wakeup()
> + end
> + end
> +
> + --
> + -- Async request metamethods.
> + --
> + local request_index = {}
> + --
> + -- When an async request is finalized (with ok or error - no
> + -- matter), its 'id' field is nullified by a response
> + -- dispatcher.
> + --
> + function request_index:is_ready()
> + return self.id == nil or worker_fiber == nil
> + end
> + --
> + -- When a request is finished, a result can be got from a
> + -- future object anytime.
> + -- @retval result, nil Success, the response is returned.
> + -- @retval nil, error Error occured.
> + --
> + function request_index:result()
> + if self.errno then
> + return nil, box.error.new({code = self.errno,
> + reason = self.response})
> + elseif not self.id then
> + return self.response
> + elseif not worker_fiber then
> + return nil, box.error.new(E_NO_CONNECTION)
> + else
> + return nil, box.error.new(box.error.PROC_LUA,
> + 'Response is not ready')
> + end
> + end
> + --
> + -- Wait for a response or error max timeout seconds.
> + -- @param timeout Max seconds to wait.
> + -- @retval result, nil Success, the response is returned.
> + -- @retval nil, error Error occured.
> + --
> + function request_index:wait_result(timeout)
> + if timeout then
> + if type(timeout) ~= 'number' or timeout < 0 then
> + error('Usage: future:wait_result(timeout)')
> + end
> + else
> + timeout = TIMEOUT_INFINITY
> + end
> + if not self:is_ready() then
> + -- When a response is ready before timeout, the
> + -- waiting client is waked up spuriously.
> + local old_client = self.client
> + self.client = fiber.self()
> + while timeout > 0 and not self:is_ready() do
> + local ts = fiber.clock()
> + state_cond:wait(timeout)
> + timeout = timeout - (fiber.clock() - ts)
> + end
> + self.client = old_client
> + if not self:is_ready() then
> + return nil, box.error.new(E_TIMEOUT)
> + end
> + -- It is possible that multiple fibers are waiting for
> + -- a result. In such a case a first, who got it, must
> + -- wakeup the previous waiting client. This one wakes
> + -- up another. Another wakes up third one, etc.
> + wakeup_client(old_client)
This is rather difficult for understanding IMO. Can we use a fiber.cond
instead?
> + end
> + return self:result()
> + end
> + --
> + -- Make a connection forget about the response. When it will
> + -- be received, it will be ignored.
> + --
> + function request_index:discard()
> + if self.id then
> + requests[self.id] = nil
> + self.id = nil
> + self.errno = box.error.PROC_LUA
> + self.response = 'Response is discarded'
> + end
> + end
> +
> + local request_mt = { __index = request_index }
> +
> -- STATE SWITCHING --
> local function set_state(new_state, new_errno, new_error)
> state = new_state
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-23 16:44 ` Vladimir Davydov
@ 2018-04-23 18:59 ` Vladislav Shpilevoy
2018-04-24 13:05 ` Vladimir Davydov
0 siblings, 1 reply; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 18:59 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
Hello. Thanks for review!
On 23/04/2018 19:44, Vladimir Davydov wrote:
> On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
>> Now any netbox call blocks a caller-fiber until a result is read
>> from a socket, or time is out. To use it asynchronously it is
>> necessary to create a fiber per request. Sometimes it is
>> unwanted - for example if RPS is very high (for example, about
>> 100k), and latency is about 1 second. Or when it is neccessary
>> to send multiple requests in paralles and then collect responses
>> (map-reduce).
>>
>> The patch introduces a new option for all netbox requests:
>> is_async. With this option any called netbox method returns
>> immediately (but still yields for a moment) a 'future' object.
>>
>> By a future object a user can check if the request is finalized,
>> get a result or error, wait for a timeout, discard a response.
>>
>> Example of is_async usage:
>> future = conn:call(func, {params}, {..., is_async = true})
>> -- Do some work ...
>> if not future.is_ready() then
>> result, err = future:wait_result(timeout)
>> end
>> -- Or:
>> result, error = future:result()
>>
>> A future:result() and :wait_result() returns either an error or
>> a response in the same format, as the sync versions of the called
>> methods.
>>
>> Part of #3107
>> + -- It is possible that multiple fibers are waiting for
>> + -- a result. In such a case a first, who got it, must
>> + -- wakeup the previous waiting client. This one wakes
>> + -- up another. Another wakes up third one, etc.
>> + wakeup_client(old_client)
>
> This is rather difficult for understanding IMO. Can we use a fiber.cond
> instead?
Sure, we can. Done on the branch.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 1f4828a7e..9bbc047d5 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -233,12 +233,6 @@ local function create_transport(host, port, user, password, callback,
local send_buf = buffer.ibuf(buffer.READAHEAD)
local recv_buf = buffer.ibuf(buffer.READAHEAD)
- local function wakeup_client(client)
- if client and client:status() ~= 'dead' then
- client:wakeup()
- end
- end
-
--
-- Async request metamethods.
--
@@ -287,22 +281,14 @@ local function create_transport(host, port, user, password, callback,
if not self:is_ready() then
-- When a response is ready before timeout, the
-- waiting client is waked up prematurely.
- local old_client = self.client
- self.client = fiber.self()
while timeout > 0 and not self:is_ready() do
local ts = fiber.clock()
- state_cond:wait(timeout)
+ self.cond:wait(timeout)
timeout = timeout - (fiber.clock() - ts)
end
- self.client = old_client
if not self:is_ready() then
return nil, box.error.new(E_TIMEOUT)
end
- -- It is possible that multiple fibers are waiting for
- -- a result. In such a case a first, who got it, must
- -- wakeup the previous waiting client. This one wakes
- -- up another. Another wakes up third one, etc.
- wakeup_client(old_client)
end
return self:result()
end
@@ -333,6 +319,7 @@ local function create_transport(host, port, user, password, callback,
request.id = nil
request.errno = new_errno
request.response = new_error
+ request.cond:broadcast()
end
requests = {}
end
@@ -428,11 +415,15 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
+ -- Request has maximum 7 members:
+ -- method, schema_version, buffer, id, cond, errno,
+ -- response.
local request = setmetatable(table_new(0, 7), request_mt)
request.method = method
request.schema_version = schema_version
request.buffer = buffer
request.id = id
+ request.cond = fiber.cond()
requests[id] = request
return request
end
@@ -468,7 +459,7 @@ local function create_transport(host, port, user, password, callback,
assert(body_end == body_end_check, "invalid xrow length")
request.errno = band(status, IPROTO_ERRNO_MASK)
request.response = body[IPROTO_ERROR_KEY]
- wakeup_client(request.client)
+ request.cond:broadcast()
return
end
@@ -479,7 +470,7 @@ local function create_transport(host, port, user, password, callback,
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
request.response = tonumber(body_len)
- wakeup_client(request.client)
+ request.cond:broadcast()
return
end
@@ -490,7 +481,7 @@ local function create_transport(host, port, user, password, callback,
request.response, request.errno =
method_decoder[request.method](body[IPROTO_DATA_KEY])
end
- wakeup_client(request.client)
+ request.cond:broadcast()
end
local function new_request_id()
@@ -601,7 +592,7 @@ local function create_transport(host, port, user, password, callback,
request.id = nil
requests[rid] = nil
request.response = response
- wakeup_client(request.client)
+ request.cond:broadcast()
return console_sm(next_id(rid))
end
end
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy
@ 2018-04-24 13:05 ` Vladimir Davydov
0 siblings, 0 replies; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-24 13:05 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 23, 2018 at 09:59:29PM +0300, Vladislav Shpilevoy wrote:
> Hello. Thanks for review!
>
> On 23/04/2018 19:44, Vladimir Davydov wrote:
> > On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
> > > Now any netbox call blocks a caller-fiber until a result is read
> > > from a socket, or time is out. To use it asynchronously it is
> > > necessary to create a fiber per request. Sometimes it is
> > > unwanted - for example if RPS is very high (for example, about
> > > 100k), and latency is about 1 second. Or when it is neccessary
> > > to send multiple requests in paralles and then collect responses
> > > (map-reduce).
> > >
> > > The patch introduces a new option for all netbox requests:
> > > is_async. With this option any called netbox method returns
> > > immediately (but still yields for a moment) a 'future' object.
> > >
> > > By a future object a user can check if the request is finalized,
> > > get a result or error, wait for a timeout, discard a response.
> > >
> > > Example of is_async usage:
> > > future = conn:call(func, {params}, {..., is_async = true})
> > > -- Do some work ...
> > > if not future.is_ready() then
> > > result, err = future:wait_result(timeout)
> > > end
> > > -- Or:
> > > result, error = future:result()
> > >
> > > A future:result() and :wait_result() returns either an error or
> > > a response in the same format, as the sync versions of the called
> > > methods.
> > >
> > > Part of #3107
> > > + -- It is possible that multiple fibers are waiting for
> > > + -- a result. In such a case a first, who got it, must
> > > + -- wakeup the previous waiting client. This one wakes
> > > + -- up another. Another wakes up third one, etc.
> > > + wakeup_client(old_client)
> >
> > This is rather difficult for understanding IMO. Can we use a fiber.cond
> > instead?
>
> Sure, we can. Done on the branch.
Thanks. The patch looks good to me now.
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 7/8] netbox: remove schema_version from requests
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (5 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-05-08 16:06 ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
7 siblings, 1 reply; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Schema_version was used in netbox to update local box-like
schema. The box-like schema makes able to access spaces and
indexes via connection object.
It was updated each time, when a response from a server is
received with a schema version non-equal to the local value.
But there was no reason why a schema version is needed in a
request. It leads to ER_WRONG_SCHEMA_VERSION error sometimes,
but netbox on this error just resends the same request again. The
same behaviour can be reached with just no sending any schema
version to a server.
Remove schema_version from request, and just track schema version
changes in responses.
Part of #3351
Part of #3333
Follow up #3107
---
src/box/lua/net_box.c | 105 +++++++++++++++++++++++-----------------------
src/box/lua/net_box.lua | 90 ++++++++++++++++-----------------------
test/box/net.box.result | 6 +--
test/box/net.box.test.lua | 6 +--
4 files changed, 95 insertions(+), 112 deletions(-)
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index db2d2dbb4..04fe70b03 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -53,7 +53,6 @@ netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type)
{
struct ibuf *ibuf = (struct ibuf *) lua_topointer(L, 1);
uint64_t sync = luaL_touint64(L, 2);
- uint64_t schema_version = luaL_touint64(L, 3);
mpstream_init(stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
luamp_error, L);
@@ -67,14 +66,11 @@ netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type)
mpstream_advance(stream, fixheader_size);
/* encode header */
- luamp_encode_map(cfg, stream, 3);
+ luamp_encode_map(cfg, stream, 2);
luamp_encode_uint(cfg, stream, IPROTO_SYNC);
luamp_encode_uint(cfg, stream, sync);
- luamp_encode_uint(cfg, stream, IPROTO_SCHEMA_VERSION);
- luamp_encode_uint(cfg, stream, schema_version);
-
luamp_encode_uint(cfg, stream, IPROTO_REQUEST_TYPE);
luamp_encode_uint(cfg, stream, r_type);
@@ -111,9 +107,8 @@ netbox_encode_request(struct mpstream *stream, size_t initial_size)
static int
netbox_encode_ping(lua_State *L)
{
- if (lua_gettop(L) < 3)
- return luaL_error(L, "Usage: netbox.encode_ping(ibuf, sync, "
- "schema_version)");
+ if (lua_gettop(L) < 2)
+ return luaL_error(L, "Usage: netbox.encode_ping(ibuf, sync)");
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_PING);
@@ -124,19 +119,20 @@ netbox_encode_ping(lua_State *L)
static int
netbox_encode_auth(lua_State *L)
{
- if (lua_gettop(L) < 6)
+ if (lua_gettop(L) < 5) {
return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, "
- "schema_version, user, password, greeting)");
+ "user, password, greeting)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_AUTH);
size_t user_len;
- const char *user = lua_tolstring(L, 4, &user_len);
+ const char *user = lua_tolstring(L, 3, &user_len);
size_t password_len;
- const char *password = lua_tolstring(L, 5, &password_len);
+ const char *password = lua_tolstring(L, 4, &password_len);
size_t salt_len;
- const char *salt = lua_tolstring(L, 6, &salt_len);
+ const char *salt = lua_tolstring(L, 5, &salt_len);
if (salt_len < SCRAMBLE_SIZE)
return luaL_error(L, "Invalid salt");
@@ -160,9 +156,10 @@ netbox_encode_auth(lua_State *L)
static int
netbox_encode_call_impl(lua_State *L, enum iproto_type type)
{
- if (lua_gettop(L) < 5)
+ if (lua_gettop(L) < 4) {
return luaL_error(L, "Usage: netbox.encode_call(ibuf, sync, "
- "schema_version, function_name, args)");
+ "function_name, args)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, type);
@@ -171,13 +168,13 @@ netbox_encode_call_impl(lua_State *L, enum iproto_type type)
/* encode proc name */
size_t name_len;
- const char *name = lua_tolstring(L, 4, &name_len);
+ const char *name = lua_tolstring(L, 3, &name_len);
luamp_encode_uint(cfg, &stream, IPROTO_FUNCTION_NAME);
luamp_encode_str(cfg, &stream, name, name_len);
/* encode args */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 5);
+ luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
return 0;
@@ -198,9 +195,10 @@ netbox_encode_call(lua_State *L)
static int
netbox_encode_eval(lua_State *L)
{
- if (lua_gettop(L) < 5)
+ if (lua_gettop(L) < 4) {
return luaL_error(L, "Usage: netbox.encode_eval(ibuf, sync, "
- "schema_version, expr, args)");
+ "expr, args)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_EVAL);
@@ -209,13 +207,13 @@ netbox_encode_eval(lua_State *L)
/* encode expr */
size_t expr_len;
- const char *expr = lua_tolstring(L, 4, &expr_len);
+ const char *expr = lua_tolstring(L, 3, &expr_len);
luamp_encode_uint(cfg, &stream, IPROTO_EXPR);
luamp_encode_str(cfg, &stream, expr, expr_len);
/* encode args */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 5);
+ luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
return 0;
@@ -224,21 +222,22 @@ netbox_encode_eval(lua_State *L)
static int
netbox_encode_select(lua_State *L)
{
- if (lua_gettop(L) < 9)
+ if (lua_gettop(L) < 8) {
return luaL_error(L, "Usage netbox.encode_select(ibuf, sync, "
- "schema_version, space_id, index_id, iterator, "
- "offset, limit, key)");
+ "space_id, index_id, iterator, offset, "
+ "limit, key)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_SELECT);
luamp_encode_map(cfg, &stream, 6);
- uint32_t space_id = lua_tonumber(L, 4);
- uint32_t index_id = lua_tonumber(L, 5);
- int iterator = lua_tointeger(L, 6);
- uint32_t offset = lua_tonumber(L, 7);
- uint32_t limit = lua_tonumber(L, 8);
+ uint32_t space_id = lua_tonumber(L, 3);
+ uint32_t index_id = lua_tonumber(L, 4);
+ int iterator = lua_tointeger(L, 5);
+ uint32_t offset = lua_tonumber(L, 6);
+ uint32_t limit = lua_tonumber(L, 7);
/* encode space_id */
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
@@ -262,7 +261,7 @@ netbox_encode_select(lua_State *L)
/* encode key */
luamp_encode_uint(cfg, &stream, IPROTO_KEY);
- luamp_convert_key(L, cfg, &stream, 9);
+ luamp_convert_key(L, cfg, &stream, 8);
netbox_encode_request(&stream, svp);
return 0;
@@ -271,24 +270,23 @@ netbox_encode_select(lua_State *L)
static inline int
netbox_encode_insert_or_replace(lua_State *L, uint32_t reqtype)
{
- if (lua_gettop(L) < 5)
+ if (lua_gettop(L) < 4) {
return luaL_error(L, "Usage: netbox.encode_insert(ibuf, sync, "
- "schema_version, space_id, tuple)");
- lua_settop(L, 5);
-
+ "space_id, tuple)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, reqtype);
luamp_encode_map(cfg, &stream, 2);
/* encode space_id */
- uint32_t space_id = lua_tonumber(L, 4);
+ uint32_t space_id = lua_tonumber(L, 3);
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
luamp_encode_uint(cfg, &stream, space_id);
/* encode args */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 5);
+ luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
return 0;
@@ -309,9 +307,10 @@ netbox_encode_replace(lua_State *L)
static int
netbox_encode_delete(lua_State *L)
{
- if (lua_gettop(L) < 6)
+ if (lua_gettop(L) < 5) {
return luaL_error(L, "Usage: netbox.encode_delete(ibuf, sync, "
- "schema_version, space_id, index_id, key)");
+ "space_id, index_id, key)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_DELETE);
@@ -319,18 +318,18 @@ netbox_encode_delete(lua_State *L)
luamp_encode_map(cfg, &stream, 3);
/* encode space_id */
- uint32_t space_id = lua_tonumber(L, 4);
+ uint32_t space_id = lua_tonumber(L, 3);
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
luamp_encode_uint(cfg, &stream, space_id);
/* encode space_id */
- uint32_t index_id = lua_tonumber(L, 5);
+ uint32_t index_id = lua_tonumber(L, 4);
luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
luamp_encode_uint(cfg, &stream, index_id);
/* encode key */
luamp_encode_uint(cfg, &stream, IPROTO_KEY);
- luamp_convert_key(L, cfg, &stream, 6);
+ luamp_convert_key(L, cfg, &stream, 5);
netbox_encode_request(&stream, svp);
return 0;
@@ -339,9 +338,10 @@ netbox_encode_delete(lua_State *L)
static int
netbox_encode_update(lua_State *L)
{
- if (lua_gettop(L) < 7)
+ if (lua_gettop(L) < 6) {
return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, "
- "schema_version, space_id, index_id, key, ops)");
+ "space_id, index_id, key, ops)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPDATE);
@@ -349,12 +349,12 @@ netbox_encode_update(lua_State *L)
luamp_encode_map(cfg, &stream, 5);
/* encode space_id */
- uint32_t space_id = lua_tonumber(L, 4);
+ uint32_t space_id = lua_tonumber(L, 3);
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
luamp_encode_uint(cfg, &stream, space_id);
/* encode index_id */
- uint32_t index_id = lua_tonumber(L, 5);
+ uint32_t index_id = lua_tonumber(L, 4);
luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID);
luamp_encode_uint(cfg, &stream, index_id);
@@ -365,12 +365,12 @@ netbox_encode_update(lua_State *L)
/* encode in reverse order for speedup - see luamp_encode() code */
/* encode ops */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 7);
+ luamp_encode_tuple(L, cfg, &stream, 6);
lua_pop(L, 1); /* ops */
/* encode key */
luamp_encode_uint(cfg, &stream, IPROTO_KEY);
- luamp_convert_key(L, cfg, &stream, 6);
+ luamp_convert_key(L, cfg, &stream, 5);
netbox_encode_request(&stream, svp);
return 0;
@@ -379,9 +379,10 @@ netbox_encode_update(lua_State *L)
static int
netbox_encode_upsert(lua_State *L)
{
- if (lua_gettop(L) != 6)
+ if (lua_gettop(L) != 5) {
return luaL_error(L, "Usage: netbox.encode_upsert(ibuf, sync, "
- "schema_version, space_id, tuple, ops)");
+ "space_id, tuple, ops)");
+ }
struct mpstream stream;
size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPSERT);
@@ -389,7 +390,7 @@ netbox_encode_upsert(lua_State *L)
luamp_encode_map(cfg, &stream, 4);
/* encode space_id */
- uint32_t space_id = lua_tonumber(L, 4);
+ uint32_t space_id = lua_tonumber(L, 3);
luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID);
luamp_encode_uint(cfg, &stream, space_id);
@@ -400,12 +401,12 @@ netbox_encode_upsert(lua_State *L)
/* encode in reverse order for speedup - see luamp_encode() code */
/* encode ops */
luamp_encode_uint(cfg, &stream, IPROTO_OPS);
- luamp_encode_tuple(L, cfg, &stream, 6);
+ luamp_encode_tuple(L, cfg, &stream, 5);
lua_pop(L, 1); /* ops */
/* encode tuple */
luamp_encode_uint(cfg, &stream, IPROTO_TUPLE);
- luamp_encode_tuple(L, cfg, &stream, 5);
+ luamp_encode_tuple(L, cfg, &stream, 4);
netbox_encode_request(&stream, svp);
return 0;
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 96f528963..a2b7b39d2 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -45,7 +45,6 @@ local IPROTO_GREETING_SIZE = 128
local E_UNKNOWN = box.error.UNKNOWN
local E_NO_CONNECTION = box.error.NO_CONNECTION
local E_TIMEOUT = box.error.TIMEOUT
-local E_WRONG_SCHEMA_VERSION = box.error.WRONG_SCHEMA_VERSION
local E_PROC_LUA = box.error.PROC_LUA
-- utility tables
@@ -93,7 +92,7 @@ local method_encoder = {
max = internal.encode_select,
count = internal.encode_call,
-- inject raw data into connection, used by console and tests
- inject = function(buf, id, schema_version, bytes)
+ inject = function(buf, id, bytes)
local ptr = buf:reserve(#bytes)
ffi.copy(ptr, bytes, #bytes)
buf.wpos = ptr + #bytes
@@ -158,7 +157,7 @@ end
-- * implements protocols; concurrent perform_request()-s benefit from
-- multiplexing support in the protocol;
-- * schema-aware (optional) - snoops responses and initiates
--- schema reload when a request fails due to schema version mismatch;
+-- schema reload when a response has a new schema version;
-- * delivers transport events via the callback.
--
-- Transport state machine:
@@ -415,7 +414,7 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Future object.
--
- local function perform_async_request(buffer, method, schema_version, ...)
+ local function perform_async_request(buffer, method, ...)
if state ~= 'active' then
return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
reason = last_error})
@@ -426,11 +425,10 @@ local function create_transport(host, port, user, password, callback,
worker_fiber:wakeup()
end
local id = next_request_id
- method_encoder[method](send_buf, id, schema_version, ...)
+ method_encoder[method](send_buf, id, ...)
next_request_id = next_id(id)
- local request = setmetatable(table_new(0, 7), request_mt)
+ local request = setmetatable(table_new(0, 6), request_mt)
request.method = method
- request.schema_version = schema_version
request.buffer = buffer
request.id = id
requests[id] = request
@@ -440,9 +438,9 @@ local function create_transport(host, port, user, password, callback,
--
-- Send a request and wait for response.
--
- local function perform_request(timeout, buffer, method, schema_version, ...)
+ local function perform_request(timeout, buffer, method, ...)
local request, err =
- perform_async_request(buffer, method, schema_version, ...)
+ perform_async_request(buffer, method, ...)
if not request then
return last_errno or E_NO_CONNECTION, last_error
end
@@ -578,7 +576,7 @@ local function create_transport(host, port, user, password, callback,
log.warn("Netbox text protocol support is deprecated since 1.10, "..
"please use require('console').connect() instead")
local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
- method_encoder.inject(send_buf, nil, nil, setup_delimiter)
+ method_encoder.inject(send_buf, nil, setup_delimiter)
local err, response = send_and_recv_console()
if err then
return error_sm(err, response)
@@ -619,7 +617,7 @@ local function create_transport(host, port, user, password, callback,
set_state('fetch_schema')
return iproto_schema_sm()
end
- encode_auth(send_buf, new_request_id(), nil, user, password, salt)
+ encode_auth(send_buf, new_request_id(), user, password, salt)
local err, hdr, body_rpos, body_end = send_and_recv_iproto()
if err then
return error_sm(err, hdr)
@@ -642,11 +640,9 @@ local function create_transport(host, port, user, password, callback,
local select2_id = new_request_id()
local response = {}
-- fetch everything from space _vspace, 2 = ITER_ALL
- encode_select(send_buf, select1_id, nil, VSPACE_ID, 0, 2, 0,
- 0xFFFFFFFF, nil)
+ encode_select(send_buf, select1_id, VSPACE_ID, 0, 2, 0, 0xFFFFFFFF, nil)
-- fetch everything from space _vindex, 2 = ITER_ALL
- encode_select(send_buf, select2_id, nil, VINDEX_ID, 0, 2, 0,
- 0xFFFFFFFF, nil)
+ encode_select(send_buf, select2_id, VINDEX_ID, 0, 2, 0, 0xFFFFFFFF, nil)
schema_version = nil -- any schema_version will do provided that
-- it is consistent across responses
repeat
@@ -692,8 +688,7 @@ local function create_transport(host, port, user, password, callback,
-- Sic: self.schema_version will be updated only after reload.
local body
body, body_end = decode(body_rpos)
- set_state('fetch_schema',
- E_WRONG_SCHEMA_VERSION, body[IPROTO_ERROR_KEY])
+ set_state('fetch_schema')
return iproto_schema_sm(schema_version)
end
return iproto_sm(schema_version)
@@ -977,57 +972,44 @@ function remote_methods:_request(method, opts, ...)
local transport = self._transport
local buffer = opts and opts.buffer
if opts and opts.is_async then
- return transport.perform_async_request(buffer, method, 0, ...)
+ return transport.perform_async_request(buffer, method, ...)
end
- local this_fiber = fiber_self()
- local perform_request = transport.perform_request
- local wait_state = transport.wait_state
- local deadline = nil
+ local deadline
if opts and opts.timeout then
-- conn.space:request(, { timeout = timeout })
deadline = fiber_clock() + opts.timeout
else
-- conn:timeout(timeout).space:request()
-- @deprecated since 1.7.4
- deadline = self._deadlines[this_fiber]
+ deadline = self._deadlines[fiber_self()]
end
- local err, res
- repeat
- local timeout = deadline and max(0, deadline - fiber_clock())
- if self.state ~= 'active' then
- wait_state('active', timeout)
- timeout = deadline and max(0, deadline - fiber_clock())
- end
- err, res = perform_request(timeout, buffer, method,
- self.schema_version, ...)
- if not err then
- return res
- elseif err == E_WRONG_SCHEMA_VERSION then
- err = nil
- end
- until err
- box.error({code = err, reason = res})
+ local timeout = deadline and max(0, deadline - fiber_clock())
+ if self.state ~= 'active' then
+ transport.wait_state('active', timeout)
+ timeout = deadline and max(0, deadline - fiber_clock())
+ end
+ local err, res = transport.perform_request(timeout, buffer, method, ...)
+ if err then
+ box.error({code = err, reason = res})
+ end
+ -- Try to wait until a schema is reloaded if needed.
+ -- Regardless of reloading result, the main response is
+ -- returned, since it does not depend on any schema things.
+ if self.state == 'fetch_schema' then
+ timeout = deadline and max(0, deadline - fiber_clock())
+ transport.wait_state('active', timeout)
+ end
+ return res
end
function remote_methods:ping(opts)
check_remote_arg(self, 'ping')
- local timeout = opts and opts.timeout
- if timeout == nil then
- -- conn:timeout(timeout):ping()
- -- @deprecated since 1.7.4
- local deadline = self._deadlines[fiber_self()]
- timeout = deadline and max(0, deadline - fiber_clock())
- or (opts and opts.timeout)
- end
- local err = self._transport.perform_request(timeout, nil, 'ping',
- self.schema_version)
- return not err or err == E_WRONG_SCHEMA_VERSION
+ return (pcall(self._request, self, 'ping', opts))
end
function remote_methods:reload_schema()
check_remote_arg(self, 'reload_schema')
- self:_request('select', nil, VSPACE_ID, 0, box.index.GE, 0, 0xFFFFFFFF,
- nil)
+ self:ping()
end
-- @deprecated since 1.7.4
@@ -1200,10 +1182,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- err, res = pr(timeout, nil, 'eval', nil, loader, {line})
+ err, res = pr(timeout, nil, 'eval', loader, {line})
else
assert(self.protocol == 'Lua console')
- err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n')
+ err, res = pr(timeout, nil, 'inject', line..'$EOF$\n')
end
if err then
box.error({code = err, reason = res})
diff --git a/test/box/net.box.result b/test/box/net.box.result
index aaa421ec6..3c494696b 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
@@ -2377,7 +2377,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
---
...
c.state
@@ -2940,7 +2940,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 82c538fbe..9a826dc6d 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
@@ -965,7 +965,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
c.state
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
@@ -1169,7 +1169,7 @@ future:is_ready()
--
c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
future:wait_result(100)
future:result()
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] [PATCH 7/8] netbox: remove schema_version from requests
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
@ 2018-05-08 16:06 ` Konstantin Osipov
2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 32+ messages in thread
From: Konstantin Osipov @ 2018-05-08 16:06 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
> Schema_version was used in netbox to update local box-like
> schema. The box-like schema makes able to access spaces and
> indexes via connection object.
>
> It was updated each time, when a response from a server is
> received with a schema version non-equal to the local value.
>
> But there was no reason why a schema version is needed in a
> request. It leads to ER_WRONG_SCHEMA_VERSION error sometimes,
> but netbox on this error just resends the same request again. The
> same behaviour can be reached with just no sending any schema
> version to a server.
As far as I can see you removed all checks for schema_version and
never re-fetch the schema after a connection is established,
despite what you write in the comment.
Please explain how you ever check that schema has changed when
performing a request.
>
> Remove schema_version from request, and just track schema version
> changes in responses.
>
> Part of #3351
> Part of #3333
> Follow up #3107
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 7/8] netbox: remove schema_version from requests
2018-05-08 16:06 ` [tarantool-patches] " Konstantin Osipov
@ 2018-05-08 17:24 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 17:24 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov
On 08/05/2018 19:06, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/04/16 21:44]:
>
>> Schema_version was used in netbox to update local box-like
>> schema. The box-like schema makes able to access spaces and
>> indexes via connection object.
>>
>> It was updated each time, when a response from a server is
>> received with a schema version non-equal to the local value.
>>
>> But there was no reason why a schema version is needed in a
>> request. It leads to ER_WRONG_SCHEMA_VERSION error sometimes,
>> but netbox on this error just resends the same request again. The
>> same behaviour can be reached with just no sending any schema
>> version to a server.
>
> As far as I can see you removed all checks for schema_version and
> never re-fetch the schema after a connection is established,
> despite what you write in the comment.
>
> Please explain how you ever check that schema has changed when
> performing a request.
As I already explained you in Telegram, this check, even before
my commit, was done inside netbox state machine.
Look at the iproto_sm - this function on each response decodes the former
and checks schema version. You may think it is called only once from
iproto_schema_sm, that is called from iproto_auth_sm, but it is wrong.
Iproto_sm is just one state of the state machine. The machine can returns
into it again and again like this:
protocol_sm -> iproto_auth_sm -> iproto_schema_sm -> iproto_sm
-> iproto_sm ->
-> iproto_sm ->
...
-> iproto_sm ->
-> iproto_schema_sm ->
-> iproto_sm ->
...
These steps are executed inside the worker fiber.
^ permalink raw reply [flat|nested] 32+ messages in thread
* [PATCH 8/8] netbox: implement perform_request via async version
2018-04-16 18:39 [PATCH 0/8] netbox: introduce fiber-async API Vladislav Shpilevoy
` (6 preceding siblings ...)
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
@ 2018-04-16 18:39 ` Vladislav Shpilevoy
2018-04-23 16:47 ` Vladimir Davydov
7 siblings, 1 reply; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-16 18:39 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
When async netbox was introduced, it is not needed to hold a
special sync implementation - it can be just async call + waiting
for a response.
Closes #3107
---
src/box/lua/net_box.lua | 26 +++++++++-----------------
1 file changed, 9 insertions(+), 17 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index a2b7b39d2..6b33f70c7 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -437,24 +437,16 @@ local function create_transport(host, port, user, password, callback,
--
-- Send a request and wait for response.
+ -- @retval nil, error Error occured.
+ -- @retval not nil Response object.
--
local function perform_request(timeout, buffer, method, ...)
local request, err =
perform_async_request(buffer, method, ...)
if not request then
- return last_errno or E_NO_CONNECTION, last_error
+ return nil, err
end
- request.client = fiber_self()
- local id = request.id
- local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
- repeat
- local timeout = max(0, deadline - fiber_clock())
- if not state_cond:wait(timeout) then
- requests[id] = nil
- return E_TIMEOUT, 'Timeout exceeded'
- end
- until requests[id] == nil -- i.e. completed (beware spurious wakeups)
- return request.errno, request.response
+ return request:wait_result(timeout)
end
local function dispatch_response_iproto(hdr, body_rpos, body_end)
@@ -988,9 +980,9 @@ function remote_methods:_request(method, opts, ...)
transport.wait_state('active', timeout)
timeout = deadline and max(0, deadline - fiber_clock())
end
- local err, res = transport.perform_request(timeout, buffer, method, ...)
+ local res, err = transport.perform_request(timeout, buffer, method, ...)
if err then
- box.error({code = err, reason = res})
+ box.error.raise(err)
end
-- Try to wait until a schema is reloaded if needed.
-- Regardless of reloading result, the main response is
@@ -1182,13 +1174,13 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- err, res = pr(timeout, nil, 'eval', loader, {line})
+ res, err = pr(timeout, nil, 'eval', loader, {line})
else
assert(self.protocol == 'Lua console')
- err, res = pr(timeout, nil, 'inject', line..'$EOF$\n')
+ res, err = pr(timeout, nil, 'inject', line..'$EOF$\n')
end
if err then
- box.error({code = err, reason = res})
+ box.error.raise(err)
end
return res[1] or res
end
--
2.15.1 (Apple Git-101)
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [PATCH 8/8] netbox: implement perform_request via async version
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
@ 2018-04-23 16:47 ` Vladimir Davydov
2018-04-23 19:00 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 32+ messages in thread
From: Vladimir Davydov @ 2018-04-23 16:47 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Apr 16, 2018 at 09:39:18PM +0300, Vladislav Shpilevoy wrote:
> When async netbox was introduced, it is not needed to hold a
> special sync implementation - it can be just async call + waiting
> for a response.
>
> Closes #3107
> ---
> src/box/lua/net_box.lua | 26 +++++++++-----------------
> 1 file changed, 9 insertions(+), 17 deletions(-)
I'd squash this one with patch 6.
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index a2b7b39d2..6b33f70c7 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -437,24 +437,16 @@ local function create_transport(host, port, user, password, callback,
>
> --
> -- Send a request and wait for response.
> + -- @retval nil, error Error occured.
> + -- @retval not nil Response object.
> --
> local function perform_request(timeout, buffer, method, ...)
> local request, err =
> perform_async_request(buffer, method, ...)
> if not request then
> - return last_errno or E_NO_CONNECTION, last_error
> + return nil, err
> end
> - request.client = fiber_self()
> - local id = request.id
> - local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
> - repeat
> - local timeout = max(0, deadline - fiber_clock())
> - if not state_cond:wait(timeout) then
> - requests[id] = nil
> - return E_TIMEOUT, 'Timeout exceeded'
> - end
> - until requests[id] == nil -- i.e. completed (beware spurious wakeups)
> - return request.errno, request.response
> + return request:wait_result(timeout)
> end
>
> local function dispatch_response_iproto(hdr, body_rpos, body_end)
> @@ -988,9 +980,9 @@ function remote_methods:_request(method, opts, ...)
> transport.wait_state('active', timeout)
> timeout = deadline and max(0, deadline - fiber_clock())
> end
> - local err, res = transport.perform_request(timeout, buffer, method, ...)
> + local res, err = transport.perform_request(timeout, buffer, method, ...)
> if err then
> - box.error({code = err, reason = res})
> + box.error.raise(err)
> end
> -- Try to wait until a schema is reloaded if needed.
> -- Regardless of reloading result, the main response is
> @@ -1182,13 +1174,13 @@ function console_methods:eval(line, timeout)
> end
> if self.protocol == 'Binary' then
> local loader = 'return require("console").eval(...)'
> - err, res = pr(timeout, nil, 'eval', loader, {line})
> + res, err = pr(timeout, nil, 'eval', loader, {line})
> else
> assert(self.protocol == 'Lua console')
> - err, res = pr(timeout, nil, 'inject', line..'$EOF$\n')
> + res, err = pr(timeout, nil, 'inject', line..'$EOF$\n')
> end
> if err then
> - box.error({code = err, reason = res})
> + box.error.raise(err)
> end
> return res[1] or res
> end
^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 8/8] netbox: implement perform_request via async version
2018-04-23 16:47 ` Vladimir Davydov
@ 2018-04-23 19:00 ` Vladislav Shpilevoy
0 siblings, 0 replies; 32+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 19:00 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
Hello. Thanks for review!
On 23/04/2018 19:47, Vladimir Davydov wrote:
> On Mon, Apr 16, 2018 at 09:39:18PM +0300, Vladislav Shpilevoy wrote:
>> When async netbox was introduced, it is not needed to hold a
>> special sync implementation - it can be just async call + waiting
>> for a response.
>>
>> Closes #3107
>> ---
>> src/box/lua/net_box.lua | 26 +++++++++-----------------
>> 1 file changed, 9 insertions(+), 17 deletions(-)
>
> I'd squash this one with patch 6.
Done.
^ permalink raw reply [flat|nested] 32+ messages in thread