Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [PATCH 13/20] net.box: rewrite send_and_recv_{iproto, console} in C
Date: Fri, 23 Jul 2021 14:07:23 +0300	[thread overview]
Message-ID: <5bc68e715031c04ebb022c70cf4ff27c0939598c.1627024646.git.vdavydov@tarantool.org> (raw)
In-Reply-To: <cover.1627024646.git.vdavydov@tarantool.org>

Basically, this patch converts the above function line-by-line from Lua
to C. This is a step towards rewriting performance-critical parts of
net.box in C.

Note, Lua code expects send_and_recv functions return errno, errmsg on
error and nil, ... on success while it's more convenient to return an
error object from C code so we do the conversion in Lua wrappers.
---
 src/box/lua/net_box.c   | 155 ++++++++++++++++++++++++++++------------
 src/box/lua/net_box.lua |  46 +++---------
 2 files changed, 121 insertions(+), 80 deletions(-)

diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index e88db6323afa..12d82738a050 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -451,9 +451,9 @@ netbox_decode_greeting(lua_State *L)
 }
 
 /**
- * communicate(fd, send_buf, recv_buf, limit_or_boundary, timeout)
- *  -> errno, error
- *  -> nil, limit/boundary_pos
+ * Reads data from the given socket until the limit or boundary is reached.
+ * Returns 0 and sets *limit_or_boundary_pos to limit/boundary_pos on success.
+ * On error returns -1 and sets diag.
  *
  * The need for this function arises from not wanting to
  * have more than one watcher for a single fd, and thus issue
@@ -465,62 +465,45 @@ netbox_decode_greeting(lua_State *L)
  * interaction.
  */
 static int
-netbox_communicate(lua_State *L)
+netbox_communicate(int fd, struct ibuf *send_buf, struct ibuf *recv_buf,
+		   size_t limit, const void *boundary, size_t boundary_len,
+		   double timeout, size_t *limit_or_boundary_pos)
 {
-	uint32_t fd = lua_tonumber(L, 1);
 	const int NETBOX_READAHEAD = 16320;
-	struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
-	struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
-
-	/* limit or boundary */
-	size_t limit = SIZE_MAX;
-	const void *boundary = NULL;
-	size_t boundary_len;
-
-	if (lua_type(L, 4) == LUA_TSTRING)
-		boundary = lua_tolstring(L, 4, &boundary_len);
-	else
-		limit = lua_tonumber(L, 4);
-
-	/* timeout */
-	ev_tstamp timeout = TIMEOUT_INFINITY;
-	if (lua_type(L, 5) == LUA_TNUMBER)
-		timeout = lua_tonumber(L, 5);
 	if (timeout < 0) {
-		lua_pushinteger(L, ER_TIMEOUT);
-		lua_pushstring(L, "Timeout exceeded");
-		return 2;
+		diag_set(ClientError, ER_TIMEOUT);
+		return -1;
 	}
 	int revents = COIO_READ;
 	while (true) {
 		/* reader serviced first */
 check_limit:
 		if (ibuf_used(recv_buf) >= limit) {
-			lua_pushnil(L);
-			lua_pushinteger(L, (lua_Integer)limit);
-			return 2;
+			*limit_or_boundary_pos = limit;
+			return 0;
 		}
 		const char *p;
 		if (boundary != NULL && (p = memmem(
 					recv_buf->rpos,
 					ibuf_used(recv_buf),
 					boundary, boundary_len)) != NULL) {
-			lua_pushnil(L);
-			lua_pushinteger(L, (lua_Integer)(
-					p - recv_buf->rpos));
-			return 2;
+			*limit_or_boundary_pos = p - recv_buf->rpos;
+			return 0;
 		}
 
 		while (revents & COIO_READ) {
 			void *p = ibuf_reserve(recv_buf, NETBOX_READAHEAD);
-			if (p == NULL)
-				luaL_error(L, "out of memory");
+			if (p == NULL) {
+				diag_set(OutOfMemory, NETBOX_READAHEAD,
+					 "ibuf", "recv_buf");
+				return -1;
+			}
 			ssize_t rc = recv(
 				fd, recv_buf->wpos, ibuf_unused(recv_buf), 0);
 			if (rc == 0) {
-				lua_pushinteger(L, ER_NO_CONNECTION);
-				lua_pushstring(L, "Peer closed");
-				return 2;
+				box_error_raise(ER_NO_CONNECTION,
+						"Peer closed");
+				return -1;
 			} if (rc > 0) {
 				recv_buf->wpos += rc;
 				goto check_limit;
@@ -545,19 +528,100 @@ check_limit:
 		ERROR_INJECT_YIELD(ERRINJ_NETBOX_IO_DELAY);
 		revents = coio_wait(fd, EV_READ | (ibuf_used(send_buf) != 0 ?
 				EV_WRITE : 0), timeout);
-		luaL_testcancel(L);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			return -1;
+		}
 		timeout = deadline - ev_monotonic_now(loop());
 		timeout = MAX(0.0, timeout);
 		if (revents == 0 && timeout == 0.0) {
-			lua_pushinteger(L, ER_TIMEOUT);
-			lua_pushstring(L, "Timeout exceeded");
-			return 2;
+			diag_set(ClientError, ER_TIMEOUT);
+			return -1;
 		}
 	}
 handle_error:
-	lua_pushinteger(L, ER_NO_CONNECTION);
-	lua_pushstring(L, strerror(errno));
-	return 2;
+	box_error_raise(ER_NO_CONNECTION, "%s", strerror(errno));
+	return -1;
+}
+
+/*
+ * Sends and receives data over an iproto connection.
+ * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
+ * On success returns header (table), body_rpos (char *), body_end (char *).
+ * On error returns nil, error.
+ */
+static int
+netbox_send_and_recv_iproto(lua_State *L)
+{
+	int fd = lua_tointeger(L, 1);
+	struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
+	struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
+	double timeout = (!lua_isnoneornil(L, 4) ?
+			  lua_tonumber(L, 4) : TIMEOUT_INFINITY);
+	while (true) {
+		size_t required;
+		size_t data_len = ibuf_used(recv_buf);
+		size_t fixheader_size = mp_sizeof_uint(UINT32_MAX);
+		if (data_len < fixheader_size) {
+			required = fixheader_size;
+		} else {
+			/* PWN! insufficient input validation */
+			const char *bufpos = recv_buf->rpos;
+			const char *rpos = bufpos;
+			size_t len = mp_decode_uint(&rpos);
+			required = (rpos - bufpos) + len;
+			if (data_len >= required) {
+				const char *body_end = rpos + len;
+				const char *body_rpos = rpos;
+				luamp_decode(L, cfg, &body_rpos);
+				*(const char **)luaL_pushcdata(
+					L, CTID_CONST_CHAR_PTR) = body_rpos;
+				*(const char **)luaL_pushcdata(
+					L, CTID_CONST_CHAR_PTR) = body_end;
+				recv_buf->rpos = (char *)body_end;
+				return 3;
+			}
+		}
+		size_t unused;
+		double deadline = fiber_clock() + timeout;
+		if (netbox_communicate(fd, send_buf, recv_buf,
+				       /*limit=*/required,
+				       /*boundary=*/NULL,
+				       /*boundary_len=*/0,
+				       timeout, &unused) != 0) {
+			luaL_testcancel(L);
+			return luaT_push_nil_and_error(L);
+		}
+		timeout = deadline - fiber_clock();
+	}
+}
+
+/*
+ * Sends and receives data over a console connection.
+ * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
+ * On success returns response (string).
+ * On error returns nil, error.
+ */
+static int
+netbox_send_and_recv_console(lua_State *L)
+{
+	int fd = lua_tointeger(L, 1);
+	struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
+	struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
+	double timeout = (!lua_isnoneornil(L, 4) ?
+			  lua_tonumber(L, 4) : TIMEOUT_INFINITY);
+	const char delim[] = "\n...\n";
+	size_t delim_len = sizeof(delim) - 1;
+	size_t delim_pos;
+	if (netbox_communicate(fd, send_buf, recv_buf, /*limit=*/SIZE_MAX,
+			       delim, delim_len, timeout, &delim_pos) != 0) {
+
+		luaL_testcancel(L);
+		return luaT_push_nil_and_error(L);
+	}
+	lua_pushlstring(L, recv_buf->rpos, delim_pos + delim_len);
+	recv_buf->rpos += delim_pos + delim_len;
+	return 1;
 }
 
 static void
@@ -1150,7 +1214,8 @@ luaopen_net_box(struct lua_State *L)
 		{ "decode_greeting",netbox_decode_greeting },
 		{ "decode_method",  netbox_decode_method },
 		{ "decode_error",   netbox_decode_error },
-		{ "communicate",    netbox_communicate },
+		{ "send_and_recv_iproto", netbox_send_and_recv_iproto },
+		{ "send_and_recv_console", netbox_send_and_recv_console },
 		{ NULL, NULL}
 	};
 	/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index d7394b088752..0ad6cac022f2 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -22,7 +22,6 @@ local check_index_arg     = box.internal.check_index_arg
 local check_space_arg     = box.internal.check_space_arg
 local check_primary_index = box.internal.check_primary_index
 
-local communicate     = internal.communicate
 local encode_auth     = internal.encode_auth
 local encode_method   = internal.encode_method
 local decode_greeting = internal.decode_greeting
@@ -582,46 +581,23 @@ local function create_transport(host, port, user, password, callback,
     end
 
     -- IO (WORKER FIBER) --
-    local function send_and_recv(limit_or_boundary, timeout)
-        return communicate(connection:fd(), send_buf, recv_buf,
-                           limit_or_boundary, timeout)
-    end
-
     local function send_and_recv_iproto(timeout)
-        local data_len = recv_buf.wpos - recv_buf.rpos
-        local required
-        if data_len < 5 then
-            required = 5
-        else
-            -- PWN! insufficient input validation
-            local bufpos = recv_buf.rpos
-            local len, rpos = decode(bufpos)
-            required = (rpos - bufpos) + len
-            if data_len >= required then
-                local body_end = rpos + len
-                local hdr, body_rpos = decode(rpos)
-                recv_buf.rpos = body_end
-                return nil, hdr, body_rpos, body_end
-            end
+        local hdr, body_rpos, body_end = internal.send_and_recv_iproto(
+            connection:fd(), send_buf, recv_buf, timeout)
+        if not hdr then
+            local err = body_rpos
+            return err.code, err.message
         end
-        local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
-        local err, extra = send_and_recv(required, timeout)
-        if err then
-            return err, extra
-        end
-        return send_and_recv_iproto(max(0, deadline - fiber_clock()))
+        return nil, hdr, body_rpos, body_end
     end
 
     local function send_and_recv_console(timeout)
-        local delim = '\n...\n'
-        local err, delim_pos = send_and_recv(delim, timeout)
-        if err then
-            return err, delim_pos
-        else
-            local response = ffi.string(recv_buf.rpos, delim_pos + #delim)
-            recv_buf.rpos = recv_buf.rpos + delim_pos + #delim
-            return nil, response
+        local response, err = internal.send_and_recv_console(
+            connection:fd(), send_buf, recv_buf, timeout)
+        if not response then
+            return err.code, err.message
         end
+        return nil, response
     end
 
     -- PROTOCOL STATE MACHINE (WORKER FIBER) --
-- 
2.25.1


  parent reply	other threads:[~2021-07-23 11:14 UTC|newest]

Thread overview: 80+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-07-23 11:07 [Tarantool-patches] [PATCH 00/20] Rewrite performance critical parts of net.box " Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 01/20] net.box: fix console connection breakage when request is discarded Vladimir Davydov via Tarantool-patches
2021-07-28 22:49   ` Vladislav Shpilevoy via Tarantool-patches
2021-07-29 10:40     ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 02/20] net.box: wake up wait_result callers " Vladimir Davydov via Tarantool-patches
2021-07-29 10:47   ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 03/20] net.box: do not check worker_fiber in request:result, is_ready Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 04/20] net.box: remove decode_push from method_decoder table Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 05/20] net.box: use decode_tuple instead of decode_get Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 06/20] net.box: rename request.ctx to request.format Vladimir Davydov via Tarantool-patches
2021-07-28 22:49   ` Vladislav Shpilevoy via Tarantool-patches
2021-07-29 10:54     ` Vladimir Davydov via Tarantool-patches
2021-07-29 22:39       ` Vladislav Shpilevoy via Tarantool-patches
2021-07-30  8:15         ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 07/20] net.box: use integer id instead of method name Vladimir Davydov via Tarantool-patches
2021-07-28 22:50   ` Vladislav Shpilevoy via Tarantool-patches
2021-07-29 11:30     ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 08/20] net.box: remove useless encode optimization Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 09/20] net.box: rewrite request encoder in C Vladimir Davydov via Tarantool-patches
2021-07-28 22:51   ` Vladislav Shpilevoy via Tarantool-patches
2021-07-29 14:08     ` Vladimir Davydov via Tarantool-patches
2021-07-29 14:10       ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 10/20] lua/utils: make char ptr Lua CTIDs public Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 11/20] net.box: rewrite response decoder in C Vladimir Davydov via Tarantool-patches
2021-07-27 14:07   ` Cyrill Gorcunov via Tarantool-patches
2021-07-27 14:14     ` Vladimir Davydov via Tarantool-patches
2021-07-29 22:39   ` Vladislav Shpilevoy via Tarantool-patches
2021-07-30  8:44     ` Vladimir Davydov via Tarantool-patches
2021-07-30 22:12       ` Vladislav Shpilevoy via Tarantool-patches
2021-08-02  7:36         ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 12/20] net.box: rewrite error " Vladimir Davydov via Tarantool-patches
2021-07-30 22:13   ` Vladislav Shpilevoy via Tarantool-patches
2021-08-02  8:00     ` Vladimir Davydov via Tarantool-patches
2021-08-02 21:47       ` Vladislav Shpilevoy via Tarantool-patches
2021-07-23 11:07 ` Vladimir Davydov via Tarantool-patches [this message]
2021-08-02 21:49   ` [Tarantool-patches] [PATCH 13/20] net.box: rewrite send_and_recv_{iproto, console} " Vladislav Shpilevoy via Tarantool-patches
2021-08-03 15:44     ` Vladimir Davydov via Tarantool-patches
2021-08-03 23:06       ` Vladislav Shpilevoy via Tarantool-patches
2021-08-04 13:56         ` Vladimir Davydov via Tarantool-patches
2021-08-04 21:18           ` Vladislav Shpilevoy via Tarantool-patches
2021-08-05  8:37             ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 14/20] net.box: rename netbox_{prepare, encode}_request to {begin, end} Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 15/20] net.box: rewrite request implementation in C Vladimir Davydov via Tarantool-patches
2021-08-02 21:54   ` Vladislav Shpilevoy via Tarantool-patches
2021-08-04 12:30     ` Vladimir Davydov via Tarantool-patches
2021-08-04 15:35       ` Vladimir Davydov via Tarantool-patches
2021-08-04 16:14         ` Vladimir Davydov via Tarantool-patches
2021-08-04 21:20       ` Vladislav Shpilevoy via Tarantool-patches
2021-08-05 12:46         ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 16/20] net.box: store next_request_id in C code Vladimir Davydov via Tarantool-patches
2021-08-03 23:06   ` Vladislav Shpilevoy via Tarantool-patches
2021-08-04 16:25     ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 17/20] net.box: rewrite console handlers in C Vladimir Davydov via Tarantool-patches
2021-08-03 23:07   ` Vladislav Shpilevoy via Tarantool-patches
2021-08-05 11:53     ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 18/20] net.box: rewrite iproto " Vladimir Davydov via Tarantool-patches
2021-08-03 23:08   ` Vladislav Shpilevoy via Tarantool-patches
2021-08-05 11:54     ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 19/20] net.box: merge new_id, new_request and encode_method Vladimir Davydov via Tarantool-patches
2021-08-03 23:08   ` Vladislav Shpilevoy via Tarantool-patches
2021-08-05 11:55     ` Vladimir Davydov via Tarantool-patches
2021-07-23 11:07 ` [Tarantool-patches] [PATCH 20/20] net.box: do not create request object in Lua for sync requests Vladimir Davydov via Tarantool-patches
2021-08-03 23:09   ` Vladislav Shpilevoy via Tarantool-patches
2021-08-05 12:23     ` Vladimir Davydov via Tarantool-patches
2021-07-23 12:48 ` [Tarantool-patches] [PATCH 00/20] Rewrite performance critical parts of net.box in C Vladimir Davydov via Tarantool-patches
2021-07-26  7:26 ` Kirill Yukhin via Tarantool-patches
2021-07-27  9:59   ` Vladimir Davydov via Tarantool-patches
2021-07-28 22:51 ` Vladislav Shpilevoy via Tarantool-patches
2021-07-29 11:33   ` Vladimir Davydov via Tarantool-patches
2021-07-29 15:23     ` Vladimir Davydov via Tarantool-patches
2021-07-29 22:38       ` Vladislav Shpilevoy via Tarantool-patches
2021-07-30 10:04         ` Vladimir Davydov via Tarantool-patches
2021-07-29 22:40 ` Vladislav Shpilevoy via Tarantool-patches
2021-07-30  8:16   ` Vladimir Davydov via Tarantool-patches
2021-08-03 23:05 ` Vladislav Shpilevoy via Tarantool-patches
2021-08-04 12:40   ` Vladimir Davydov via Tarantool-patches
2021-08-05 20:59 ` Vladislav Shpilevoy via Tarantool-patches
2021-08-09 11:22 ` Igor Munkin via Tarantool-patches
2021-08-09 11:48   ` Vitaliia Ioffe via Tarantool-patches
2021-08-09 13:56     ` Vladimir Davydov via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=5bc68e715031c04ebb022c70cf4ff27c0939598c.1627024646.git.vdavydov@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=vdavydov@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 13/20] net.box: rewrite send_and_recv_{iproto, console} in C' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox