[Tarantool-patches] [PATCH 20/20] net.box: do not create request object in Lua for sync requests
Vladimir Davydov
vdavydov at tarantool.org
Fri Jul 23 14:07:30 MSK 2021
It's not really necessary - we can wait for the request to complete in C
code, without returning to Lua. Since creating a Lua object puts extra
pressure on the garbage collector, we'd better avoid it when we can.
---
src/box/lua/net_box.c | 67 +++++++++++++++++++++++++++++------------
src/box/lua/net_box.lua | 39 ++++++++++++++++--------
2 files changed, 74 insertions(+), 32 deletions(-)
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 844a1de613f2..684091cf898f 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -1610,7 +1610,7 @@ netbox_new_registry(struct lua_State *L)
* Writes a request to the send buffer and registers the request object
* ('future') that can be used for waiting for a response.
*
- * Takes the following arguments:
+ * Takes the following values from Lua stack starting at index idx:
* - requests: registry to register the new request with
* - send_buf: buffer (ibuf) to write the encoded request to
* - buffer: buffer (ibuf) to write the result to or nil
@@ -1621,43 +1621,71 @@ netbox_new_registry(struct lua_State *L)
* - format: tuple format to use for decoding the body or nil
* - ...: method-specific arguments passed to the encoder
*/
-static int
-netbox_perform_async_request(struct lua_State *L)
+static void
+netbox_perform_async_request_impl(struct lua_State *L, int idx,
+ struct netbox_request *request)
{
- struct netbox_request *request = lua_newuserdata(L, sizeof(*request));
- netbox_request_create(request);
- luaL_getmetatable(L, netbox_request_typename);
- lua_setmetatable(L, -2);
-
/* Encode and write the request to the send buffer. */
- struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
- struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
- enum netbox_method method = lua_tointeger(L, 5);
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, idx);
+ struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, idx + 1);
+ enum netbox_method method = lua_tointeger(L, idx + 4);
assert(method < netbox_method_MAX);
uint64_t sync = registry->next_sync++;
- netbox_encode_method(L, 9, method, send_buf, sync);
+ netbox_encode_method(L, idx + 8, method, send_buf, sync);
/* Initialize and register the request object. */
request->method = method;
request->sync = sync;
- request->buffer = (struct ibuf *) lua_topointer(L, 3);
- lua_pushvalue(L, 3);
+ request->buffer = (struct ibuf *) lua_topointer(L, idx + 2);
+ lua_pushvalue(L, idx + 2);
request->buffer_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- request->skip_header = lua_toboolean(L, 4);
- lua_pushvalue(L, 6);
+ request->skip_header = lua_toboolean(L, idx + 3);
+ lua_pushvalue(L, idx + 5);
request->on_push_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- lua_pushvalue(L, 7);
+ lua_pushvalue(L, idx + 6);
request->on_push_ctx_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- if (!lua_isnil(L, 8))
- request->format = lbox_check_tuple_format(L, 8);
+ if (!lua_isnil(L, idx + 7))
+ request->format = lbox_check_tuple_format(L, idx + 7);
else
request->format = tuple_format_runtime;
tuple_format_ref(request->format);
if (netbox_request_register(request, registry) != 0)
luaT_error(L);
+}
+
+static int
+netbox_perform_async_request(struct lua_State *L)
+{
+ struct netbox_request *request = lua_newuserdata(L, sizeof(*request));
+ netbox_request_create(request);
+ luaL_getmetatable(L, netbox_request_typename);
+ lua_setmetatable(L, -2);
+ netbox_perform_async_request_impl(L, 1, request);
return 1;
}
+static int
+netbox_perform_request(struct lua_State *L)
+{
+ double timeout = (!lua_isnil(L, 1) ?
+ lua_tonumber(L, 1) : TIMEOUT_INFINITY);
+ struct netbox_request request;
+ netbox_request_create(&request);
+ netbox_perform_async_request_impl(L, 2, &request);
+ while (!netbox_request_is_ready(&request)) {
+ if (!netbox_request_wait(&request, &timeout)) {
+ netbox_request_unregister(&request);
+ netbox_request_destroy(&request);
+ luaL_testcancel(L);
+ diag_set(ClientError, ER_TIMEOUT);
+ return luaT_push_nil_and_error(L);
+ }
+ }
+ int ret = netbox_request_push_result(&request, L);
+ netbox_request_destroy(&request);
+ return ret;
+}
+
/*
* Given a request registry and a response header, decodes the response and
* either completes the request or invokes the on-push trigger, depending on
@@ -2028,6 +2056,7 @@ luaopen_net_box(struct lua_State *L)
{ "decode_greeting",netbox_decode_greeting },
{ "new_registry", netbox_new_registry },
{ "perform_async_request", netbox_perform_async_request },
+ { "perform_request",netbox_perform_request },
{ "iproto_auth", netbox_iproto_auth },
{ "iproto_schema", netbox_iproto_schema },
{ "iproto_loop", netbox_iproto_loop },
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 55d172a1f6b9..d6367a848aa1 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -253,19 +253,27 @@ local function create_transport(host, port, user, password, callback,
end
end
- --
- -- Send a request and do not wait for response.
- -- @retval nil, error Error occured.
- -- @retval not nil Future object.
- --
- local function perform_async_request(buffer, skip_header, method, on_push,
- on_push_ctx, format, ...)
+ local function check_active()
if state ~= 'active' and state ~= 'fetch_schema' then
local code = last_errno or E_NO_CONNECTION
local msg = last_error or
string.format('Connection is not established, state is "%s"',
state)
- return nil, box.error.new({code = code, reason = msg})
+ return box.error.new({code = code, reason = msg})
+ end
+ return nil
+ end
+
+ --
+ -- Send a request and do not wait for response.
+ -- @retval nil, error Error occured.
+ -- @retval not nil Future object.
+ --
+ local function perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, format, ...)
+ local err = check_active()
+ if err then
+ return nil, err
end
-- alert worker to notify it of the queued outgoing data;
-- if the buffer wasn't empty, assume the worker was already alerted
@@ -284,13 +292,18 @@ local function create_transport(host, port, user, password, callback,
--
local function perform_request(timeout, buffer, skip_header, method,
on_push, on_push_ctx, format, ...)
- local request, err =
- perform_async_request(buffer, skip_header, method, on_push,
- on_push_ctx, format, ...)
- if not request then
+ local err = check_active()
+ if err then
return nil, err
end
- return request:wait_result(timeout)
+ -- alert worker to notify it of the queued outgoing data;
+ -- if the buffer wasn't empty, assume the worker was already alerted
+ if send_buf:size() == 0 then
+ worker_fiber:wakeup()
+ end
+ return internal.perform_request(timeout, requests, send_buf, buffer,
+ skip_header, method, on_push,
+ on_push_ctx, format, ...)
end
-- PROTOCOL STATE MACHINE (WORKER FIBER) --
--
2.25.1
More information about the Tarantool-patches
mailing list