[Tarantool-patches] [PATCH 17/20] net.box: rewrite console handlers in C
Vladimir Davydov
vdavydov at tarantool.org
Fri Jul 23 14:07:27 MSK 2021
Strictly speaking, it's not necessary, because console performance is
not a problem. We do this for consistency with iproto.
---
src/box/lua/net_box.c | 135 ++++++++++++++++++++++++++--------------
src/box/lua/net_box.lua | 49 +++++----------
2 files changed, 106 insertions(+), 78 deletions(-)
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 1a615797d485..85a45c54b979 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -850,30 +850,25 @@ netbox_send_and_recv_iproto(lua_State *L)
/*
* Sends and receives data over a console connection.
- * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
- * On success returns response (string).
- * On error returns nil, error.
+ * Returns a pointer to a response string and its len.
+ * On error returns NULL.
*/
-static int
-netbox_send_and_recv_console(lua_State *L)
+static const char *
+netbox_send_and_recv_console(int fd, struct ibuf *send_buf,
+ struct ibuf *recv_buf, double timeout,
+ size_t *response_len)
{
- int fd = lua_tointeger(L, 1);
- struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
- struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
- double timeout = (!lua_isnoneornil(L, 4) ?
- lua_tonumber(L, 4) : TIMEOUT_INFINITY);
const char delim[] = "\n...\n";
size_t delim_len = sizeof(delim) - 1;
size_t delim_pos;
if (netbox_communicate(fd, send_buf, recv_buf, /*limit=*/SIZE_MAX,
delim, delim_len, timeout, &delim_pos) != 0) {
-
- luaL_testcancel(L);
- return luaT_push_nil_and_error(L);
+ return NULL;
}
- lua_pushlstring(L, recv_buf->rpos, delim_pos + delim_len);
+ const char *response = recv_buf->rpos;
recv_buf->rpos += delim_pos + delim_len;
- return 1;
+ *response_len = delim_pos + delim_len;
+ return response;
}
static void
@@ -1456,23 +1451,6 @@ luaT_netbox_registry_new_id(struct lua_State *L)
return 1;
}
-/*
- * Returns the next id (sync) without reserving it.
- * If called with an argument, returns the id following its value.
- */
-static int
-luaT_netbox_registry_next_id(struct lua_State *L)
-{
- struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
- uint64_t next_sync;
- if (lua_isnoneornil(L, 2))
- next_sync = registry->next_sync;
- else
- next_sync = luaL_touint64(L, 2) + 1;
- luaL_pushuint64(L, next_sync);
- return 1;
-}
-
static int
luaT_netbox_registry_reset(struct lua_State *L)
{
@@ -1802,24 +1780,93 @@ netbox_dispatch_response_iproto(struct lua_State *L)
/*
* Given a request registry, request id (sync), and a response string, assigns
* the response to the request and completes it.
+ *
+ * Lua stack is used for temporarily storing the response string before getting
+ * a reference to it.
*/
-static int
-netbox_dispatch_response_console(struct lua_State *L)
+static void
+netbox_dispatch_response_console(struct lua_State *L,
+ struct netbox_registry *registry,
+ uint64_t sync, const char *response,
+ size_t response_len)
{
- struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
- uint64_t sync = luaL_touint64(L, 2);
struct netbox_request *request = netbox_registry_lookup(registry, sync);
if (request == NULL) {
/* Nobody is waiting for the response. */
- return 0;
+ return;
}
- /*
- * The response is the last argument of this function so it's already
- * on the top of the Lua stack.
- */
+ lua_pushlstring(L, response, response_len);
netbox_request_set_result(request, luaL_ref(L, LUA_REGISTRYINDEX));
netbox_request_complete(request);
+}
+
+/*
+ * Sets up console delimiter. Should be called before serving any requests.
+ * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
+ * Returns none on success, error on failure.
+ */
+static int
+netbox_console_setup(struct lua_State *L)
+{
+ static const char setup_delimiter_cmd[] =
+ "require('console').delimiter('$EOF$')\n";
+ static const size_t setup_delimiter_cmd_len =
+ sizeof(setup_delimiter_cmd) - 1;
+ static const char ok_response[] = "---\n...\n";
+ static const size_t ok_response_len = sizeof(ok_response) - 1;
+ int fd = lua_tointeger(L, 1);
+ struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
+ struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
+ double timeout = (!lua_isnoneornil(L, 4) ?
+ lua_tonumber(L, 4) : TIMEOUT_INFINITY);
+ void *wpos = ibuf_alloc(send_buf, setup_delimiter_cmd_len);
+ if (wpos == NULL)
+ return luaL_error(L, "out of memory");
+ memcpy(wpos, setup_delimiter_cmd, setup_delimiter_cmd_len);
+ size_t response_len;
+ const char *response = netbox_send_and_recv_console(
+ fd, send_buf, recv_buf, timeout, &response_len);
+ if (response == NULL) {
+ luaL_testcancel(L);
+ goto error;
+ }
+ if (strncmp(response, ok_response, ok_response_len) != 0) {
+ box_error_raise(ER_NO_CONNECTION, "Unexpected response");
+ goto error;
+ }
return 0;
+error:
+ luaT_pusherror(L, box_error_last());
+ return 1;
+}
+
+/*
+ * Processes console requests in a loop until an error.
+ * Takes request registry, socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
+ * Returns the error that broke the loop.
+ */
+static int
+netbox_console_loop(struct lua_State *L)
+{
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+ int fd = lua_tointeger(L, 2);
+ struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 3);
+ struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 4);
+ double timeout = (!lua_isnoneornil(L, 5) ?
+ lua_tonumber(L, 5) : TIMEOUT_INFINITY);
+ uint64_t sync = registry->next_sync;
+ while (true) {
+ size_t response_len;
+ const char *response = netbox_send_and_recv_console(
+ fd, send_buf, recv_buf, timeout, &response_len);
+ if (response == NULL) {
+ luaL_testcancel(L);
+ luaT_pusherror(L, box_error_last());
+ return 1;
+ }
+ netbox_dispatch_response_console(L, registry, sync++,
+ response, response_len);
+ }
}
int
@@ -1828,7 +1875,6 @@ luaopen_net_box(struct lua_State *L)
static const struct luaL_Reg netbox_registry_meta[] = {
{ "__gc", luaT_netbox_registry_gc },
{ "new_id", luaT_netbox_registry_new_id },
- { "next_id", luaT_netbox_registry_next_id },
{ "reset", luaT_netbox_registry_reset },
{ NULL, NULL }
};
@@ -1850,12 +1896,11 @@ luaopen_net_box(struct lua_State *L)
{ "encode_method", netbox_encode_method },
{ "decode_greeting",netbox_decode_greeting },
{ "send_and_recv_iproto", netbox_send_and_recv_iproto },
- { "send_and_recv_console", netbox_send_and_recv_console },
{ "new_registry", netbox_new_registry },
{ "new_request", netbox_new_request },
{ "dispatch_response_iproto", netbox_dispatch_response_iproto },
- { "dispatch_response_console",
- netbox_dispatch_response_console },
+ { "console_setup", netbox_console_setup },
+ { "console_loop", netbox_console_loop },
{ NULL, NULL}
};
/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 0643477cbc9c..0a21c1341117 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -327,10 +327,6 @@ local function create_transport(host, port, user, password, callback,
body_rpos, body_end)
end
- local function dispatch_response_console(rid, response)
- internal.dispatch_response_console(requests, rid, response)
- end
-
-- IO (WORKER FIBER) --
local function send_and_recv_iproto(timeout)
local hdr, body_rpos, body_end = internal.send_and_recv_iproto(
@@ -342,22 +338,14 @@ local function create_transport(host, port, user, password, callback,
return nil, hdr, body_rpos, body_end
end
- local function send_and_recv_console(timeout)
- local response, err = internal.send_and_recv_console(
- connection:fd(), send_buf, recv_buf, timeout)
- if not response then
- return err.code, err.message
- end
- return nil, response
- end
-
-- PROTOCOL STATE MACHINE (WORKER FIBER) --
--
-- The sm is implemented as a collection of functions performing
-- tail-recursive calls to each other. Yep, Lua optimizes
-- such calls, and yep, this is the canonical way to implement
-- a state machine in Lua.
- local console_sm, iproto_auth_sm, iproto_schema_sm, iproto_sm, error_sm
+ local console_setup_sm, console_sm, iproto_auth_sm, iproto_schema_sm,
+ iproto_sm, error_sm
--
-- Protocol_sm is a core function of netbox. It calls all
@@ -376,19 +364,7 @@ local function create_transport(host, port, user, password, callback,
end
-- @deprecated since 1.10
if greeting.protocol == 'Lua console' then
- log.warn("Netbox text protocol support is deprecated since 1.10, "..
- "please use require('console').connect() instead")
- local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
- encode_method(M_INJECT, send_buf, nil, setup_delimiter)
- local err, response = send_and_recv_console()
- if err then
- return error_sm(err, response)
- elseif response ~= '---\n...\n' then
- return error_sm(E_NO_CONNECTION, 'Unexpected response')
- end
- local rid = requests:next_id()
- set_state('active')
- return console_sm(rid)
+ return console_setup_sm()
elseif greeting.protocol == 'Binary' then
return iproto_auth_sm(greeting.salt)
else
@@ -397,14 +373,21 @@ local function create_transport(host, port, user, password, callback,
end
end
- console_sm = function(rid)
- local err, response = send_and_recv_console()
+ console_setup_sm = function()
+ log.warn("Netbox text protocol support is deprecated since 1.10, "..
+ "please use require('console').connect() instead")
+ local err = internal.console_setup(connection:fd(), send_buf, recv_buf)
if err then
- return error_sm(err, response)
- else
- dispatch_response_console(rid, response)
- return console_sm(requests:next_id(rid))
+ return error_sm(err.code, err.message)
end
+ set_state('active')
+ return console_sm()
+ end
+
+ console_sm = function()
+ local err = internal.console_loop(requests, connection:fd(),
+ send_buf, recv_buf)
+ return error_sm(err.code, err.message)
end
iproto_auth_sm = function(salt)
--
2.25.1
More information about the Tarantool-patches
mailing list