[Tarantool-patches] [PATCH 13/20] net.box: rewrite send_and_recv_{iproto, console} in C
Vladimir Davydov
vdavydov at tarantool.org
Fri Jul 23 14:07:23 MSK 2021
Basically, this patch converts the above function line-by-line from Lua
to C. This is a step towards rewriting performance-critical parts of
net.box in C.
Note, Lua code expects send_and_recv functions return errno, errmsg on
error and nil, ... on success while it's more convenient to return an
error object from C code so we do the conversion in Lua wrappers.
---
src/box/lua/net_box.c | 155 ++++++++++++++++++++++++++++------------
src/box/lua/net_box.lua | 46 +++---------
2 files changed, 121 insertions(+), 80 deletions(-)
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index e88db6323afa..12d82738a050 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -451,9 +451,9 @@ netbox_decode_greeting(lua_State *L)
}
/**
- * communicate(fd, send_buf, recv_buf, limit_or_boundary, timeout)
- * -> errno, error
- * -> nil, limit/boundary_pos
+ * Reads data from the given socket until the limit or boundary is reached.
+ * Returns 0 and sets *limit_or_boundary_pos to limit/boundary_pos on success.
+ * On error returns -1 and sets diag.
*
* The need for this function arises from not wanting to
* have more than one watcher for a single fd, and thus issue
@@ -465,62 +465,45 @@ netbox_decode_greeting(lua_State *L)
* interaction.
*/
static int
-netbox_communicate(lua_State *L)
+netbox_communicate(int fd, struct ibuf *send_buf, struct ibuf *recv_buf,
+ size_t limit, const void *boundary, size_t boundary_len,
+ double timeout, size_t *limit_or_boundary_pos)
{
- uint32_t fd = lua_tonumber(L, 1);
const int NETBOX_READAHEAD = 16320;
- struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
- struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
-
- /* limit or boundary */
- size_t limit = SIZE_MAX;
- const void *boundary = NULL;
- size_t boundary_len;
-
- if (lua_type(L, 4) == LUA_TSTRING)
- boundary = lua_tolstring(L, 4, &boundary_len);
- else
- limit = lua_tonumber(L, 4);
-
- /* timeout */
- ev_tstamp timeout = TIMEOUT_INFINITY;
- if (lua_type(L, 5) == LUA_TNUMBER)
- timeout = lua_tonumber(L, 5);
if (timeout < 0) {
- lua_pushinteger(L, ER_TIMEOUT);
- lua_pushstring(L, "Timeout exceeded");
- return 2;
+ diag_set(ClientError, ER_TIMEOUT);
+ return -1;
}
int revents = COIO_READ;
while (true) {
/* reader serviced first */
check_limit:
if (ibuf_used(recv_buf) >= limit) {
- lua_pushnil(L);
- lua_pushinteger(L, (lua_Integer)limit);
- return 2;
+ *limit_or_boundary_pos = limit;
+ return 0;
}
const char *p;
if (boundary != NULL && (p = memmem(
recv_buf->rpos,
ibuf_used(recv_buf),
boundary, boundary_len)) != NULL) {
- lua_pushnil(L);
- lua_pushinteger(L, (lua_Integer)(
- p - recv_buf->rpos));
- return 2;
+ *limit_or_boundary_pos = p - recv_buf->rpos;
+ return 0;
}
while (revents & COIO_READ) {
void *p = ibuf_reserve(recv_buf, NETBOX_READAHEAD);
- if (p == NULL)
- luaL_error(L, "out of memory");
+ if (p == NULL) {
+ diag_set(OutOfMemory, NETBOX_READAHEAD,
+ "ibuf", "recv_buf");
+ return -1;
+ }
ssize_t rc = recv(
fd, recv_buf->wpos, ibuf_unused(recv_buf), 0);
if (rc == 0) {
- lua_pushinteger(L, ER_NO_CONNECTION);
- lua_pushstring(L, "Peer closed");
- return 2;
+ box_error_raise(ER_NO_CONNECTION,
+ "Peer closed");
+ return -1;
} if (rc > 0) {
recv_buf->wpos += rc;
goto check_limit;
@@ -545,19 +528,100 @@ check_limit:
ERROR_INJECT_YIELD(ERRINJ_NETBOX_IO_DELAY);
revents = coio_wait(fd, EV_READ | (ibuf_used(send_buf) != 0 ?
EV_WRITE : 0), timeout);
- luaL_testcancel(L);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ return -1;
+ }
timeout = deadline - ev_monotonic_now(loop());
timeout = MAX(0.0, timeout);
if (revents == 0 && timeout == 0.0) {
- lua_pushinteger(L, ER_TIMEOUT);
- lua_pushstring(L, "Timeout exceeded");
- return 2;
+ diag_set(ClientError, ER_TIMEOUT);
+ return -1;
}
}
handle_error:
- lua_pushinteger(L, ER_NO_CONNECTION);
- lua_pushstring(L, strerror(errno));
- return 2;
+ box_error_raise(ER_NO_CONNECTION, "%s", strerror(errno));
+ return -1;
+}
+
+/*
+ * Sends and receives data over an iproto connection.
+ * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
+ * On success returns header (table), body_rpos (char *), body_end (char *).
+ * On error returns nil, error.
+ */
+static int
+netbox_send_and_recv_iproto(lua_State *L)
+{
+ int fd = lua_tointeger(L, 1);
+ struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
+ struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
+ double timeout = (!lua_isnoneornil(L, 4) ?
+ lua_tonumber(L, 4) : TIMEOUT_INFINITY);
+ while (true) {
+ size_t required;
+ size_t data_len = ibuf_used(recv_buf);
+ size_t fixheader_size = mp_sizeof_uint(UINT32_MAX);
+ if (data_len < fixheader_size) {
+ required = fixheader_size;
+ } else {
+ /* PWN! insufficient input validation */
+ const char *bufpos = recv_buf->rpos;
+ const char *rpos = bufpos;
+ size_t len = mp_decode_uint(&rpos);
+ required = (rpos - bufpos) + len;
+ if (data_len >= required) {
+ const char *body_end = rpos + len;
+ const char *body_rpos = rpos;
+ luamp_decode(L, cfg, &body_rpos);
+ *(const char **)luaL_pushcdata(
+ L, CTID_CONST_CHAR_PTR) = body_rpos;
+ *(const char **)luaL_pushcdata(
+ L, CTID_CONST_CHAR_PTR) = body_end;
+ recv_buf->rpos = (char *)body_end;
+ return 3;
+ }
+ }
+ size_t unused;
+ double deadline = fiber_clock() + timeout;
+ if (netbox_communicate(fd, send_buf, recv_buf,
+ /*limit=*/required,
+ /*boundary=*/NULL,
+ /*boundary_len=*/0,
+ timeout, &unused) != 0) {
+ luaL_testcancel(L);
+ return luaT_push_nil_and_error(L);
+ }
+ timeout = deadline - fiber_clock();
+ }
+}
+
+/*
+ * Sends and receives data over a console connection.
+ * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
+ * On success returns response (string).
+ * On error returns nil, error.
+ */
+static int
+netbox_send_and_recv_console(lua_State *L)
+{
+ int fd = lua_tointeger(L, 1);
+ struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
+ struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
+ double timeout = (!lua_isnoneornil(L, 4) ?
+ lua_tonumber(L, 4) : TIMEOUT_INFINITY);
+ const char delim[] = "\n...\n";
+ size_t delim_len = sizeof(delim) - 1;
+ size_t delim_pos;
+ if (netbox_communicate(fd, send_buf, recv_buf, /*limit=*/SIZE_MAX,
+ delim, delim_len, timeout, &delim_pos) != 0) {
+
+ luaL_testcancel(L);
+ return luaT_push_nil_and_error(L);
+ }
+ lua_pushlstring(L, recv_buf->rpos, delim_pos + delim_len);
+ recv_buf->rpos += delim_pos + delim_len;
+ return 1;
}
static void
@@ -1150,7 +1214,8 @@ luaopen_net_box(struct lua_State *L)
{ "decode_greeting",netbox_decode_greeting },
{ "decode_method", netbox_decode_method },
{ "decode_error", netbox_decode_error },
- { "communicate", netbox_communicate },
+ { "send_and_recv_iproto", netbox_send_and_recv_iproto },
+ { "send_and_recv_console", netbox_send_and_recv_console },
{ NULL, NULL}
};
/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index d7394b088752..0ad6cac022f2 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -22,7 +22,6 @@ local check_index_arg = box.internal.check_index_arg
local check_space_arg = box.internal.check_space_arg
local check_primary_index = box.internal.check_primary_index
-local communicate = internal.communicate
local encode_auth = internal.encode_auth
local encode_method = internal.encode_method
local decode_greeting = internal.decode_greeting
@@ -582,46 +581,23 @@ local function create_transport(host, port, user, password, callback,
end
-- IO (WORKER FIBER) --
- local function send_and_recv(limit_or_boundary, timeout)
- return communicate(connection:fd(), send_buf, recv_buf,
- limit_or_boundary, timeout)
- end
-
local function send_and_recv_iproto(timeout)
- local data_len = recv_buf.wpos - recv_buf.rpos
- local required
- if data_len < 5 then
- required = 5
- else
- -- PWN! insufficient input validation
- local bufpos = recv_buf.rpos
- local len, rpos = decode(bufpos)
- required = (rpos - bufpos) + len
- if data_len >= required then
- local body_end = rpos + len
- local hdr, body_rpos = decode(rpos)
- recv_buf.rpos = body_end
- return nil, hdr, body_rpos, body_end
- end
+ local hdr, body_rpos, body_end = internal.send_and_recv_iproto(
+ connection:fd(), send_buf, recv_buf, timeout)
+ if not hdr then
+ local err = body_rpos
+ return err.code, err.message
end
- local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
- local err, extra = send_and_recv(required, timeout)
- if err then
- return err, extra
- end
- return send_and_recv_iproto(max(0, deadline - fiber_clock()))
+ return nil, hdr, body_rpos, body_end
end
local function send_and_recv_console(timeout)
- local delim = '\n...\n'
- local err, delim_pos = send_and_recv(delim, timeout)
- if err then
- return err, delim_pos
- else
- local response = ffi.string(recv_buf.rpos, delim_pos + #delim)
- recv_buf.rpos = recv_buf.rpos + delim_pos + #delim
- return nil, response
+ local response, err = internal.send_and_recv_console(
+ connection:fd(), send_buf, recv_buf, timeout)
+ if not response then
+ return err.code, err.message
end
+ return nil, response
end
-- PROTOCOL STATE MACHINE (WORKER FIBER) --
--
2.25.1
More information about the Tarantool-patches
mailing list