[Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input

Serge Petrenko sergepetrenko at tarantool.org
Mon Dec 6 06:03:20 MSK 2021


coio_read_xrow(_timeout_xc) uses an ibuf as a temporary storage for
input. It parses the xrow upon receipt and immediately discards the
input buffer. This means that the just parsed xrow's body may be used
only until the next row is read: each new read may reuse the space
where row's body was located.

So once the user has to read multiple rows (say, a transaction) before
processing them, he is forced to copy rows' bodies to some other
location.

Let's better let the user decide whether input should be immediately
discarded or not.

Introduce coio_read_xrow_ex(_timeout_xc), which doesn't discard the
input upon parse, and uses ibuf->xpos instead of ibuf->rpos to point at
the end of parsed data. This way the user is free to decide when to
advance ibuf->rpos and discard the input.

Introduce macros with old behaviour for xrow_read_xrow(_timeout_xc).

Prerequisite #6329
---
 src/box/xrow_io.cc | 28 ++++++++++++++--------------
 src/box/xrow_io.h  | 18 +++++++++++++++---
 src/lib/small      |  2 +-
 src/lua/buffer.lua |  2 ++
 4 files changed, 32 insertions(+), 18 deletions(-)

diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 20e327239..9d0d4b12e 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -36,61 +36,61 @@
 #include "msgpuck/msgpuck.h"
 
 void
-coio_read_xrow(struct iostream *io, struct ibuf *in, struct xrow_header *row)
+coio_read_xrow_ex(struct iostream *io, struct ibuf *in, struct xrow_header *row)
 {
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
+	if (ibuf_unparsed(in) < 1)
 		coio_breadn(io, in, 1);
 
 	/* Read length */
-	if (mp_typeof(*in->rpos) != MP_UINT) {
+	if (mp_typeof(*in->xpos) != MP_UINT) {
 		tnt_raise(ClientError, ER_INVALID_MSGPACK,
 			  "packet length");
 	}
-	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
+	ssize_t to_read = mp_check_uint(in->xpos, in->wpos);
 	if (to_read > 0)
 		coio_breadn(io, in, to_read);
 
-	uint32_t len = mp_decode_uint((const char **) &in->rpos);
+	uint32_t len = mp_decode_uint((const char **) &in->xpos);
 
 	/* Read header and body */
-	to_read = len - ibuf_used(in);
+	to_read = len - ibuf_unparsed(in);
 	if (to_read > 0)
 		coio_breadn(io, in, to_read);
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
+	xrow_header_decode_xc(row, (const char **) &in->xpos, in->xpos + len,
 			      true);
 }
 
 void
-coio_read_xrow_timeout_xc(struct iostream *io, struct ibuf *in,
+coio_read_xrow_ex_timeout_xc(struct iostream *io, struct ibuf *in,
 			  struct xrow_header *row, ev_tstamp timeout)
 {
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
+	if (ibuf_unparsed(in) < 1)
 		coio_breadn_timeout(io, in, 1, delay);
 	coio_timeout_update(&start, &delay);
 
 	/* Read length */
-	if (mp_typeof(*in->rpos) != MP_UINT) {
+	if (mp_typeof(*in->xpos) != MP_UINT) {
 		tnt_raise(ClientError, ER_INVALID_MSGPACK,
 			  "packet length");
 	}
-	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
+	ssize_t to_read = mp_check_uint(in->xpos, in->wpos);
 	if (to_read > 0)
 		coio_breadn_timeout(io, in, to_read, delay);
 	coio_timeout_update(&start, &delay);
 
-	uint32_t len = mp_decode_uint((const char **) &in->rpos);
+	uint32_t len = mp_decode_uint((const char **) &in->xpos);
 
 	/* Read header and body */
-	to_read = len - ibuf_used(in);
+	to_read = len - ibuf_unparsed(in);
 	if (to_read > 0)
 		coio_breadn_timeout(io, in, to_read, delay);
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
+	xrow_header_decode_xc(row, (const char **) &in->xpos, in->xpos + len,
 			      true);
 }
 
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 4e81b6e0c..5ecdd108e 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -39,11 +39,23 @@ struct iostream;
 struct xrow_header;
 
 void
-coio_read_xrow(struct iostream *io, struct ibuf *in, struct xrow_header *row);
+coio_read_xrow_ex(struct iostream *io, struct ibuf *in, struct xrow_header *row);
+
+#define coio_read_xrow(io, in, row)					\
+	do {								\
+		coio_read_xrow_ex(io, in, row);			\
+		(in)->rpos = (in)->xpos;				\
+	} while (0)
 
 void
-coio_read_xrow_timeout_xc(struct iostream *io, struct ibuf *in,
-			  struct xrow_header *row, double timeout);
+coio_read_xrow_ex_timeout_xc(struct iostream *io, struct ibuf *in,
+			     struct xrow_header *row, double timeout);
+
+#define coio_read_xrow_timeout_xc(io, in, row, timeout)		\
+	do {								\
+		coio_read_xrow_ex_timeout_xc(io, in, row, timeout);	\
+		(in)->rpos = (in)->xpos;				\
+	} while (0)
 
 void
 coio_write_xrow(struct iostream *io, const struct xrow_header *row);
diff --git a/src/lib/small b/src/lib/small
index 3d15a7058..47ca9ee09 160000
--- a/src/lib/small
+++ b/src/lib/small
@@ -1 +1 @@
-Subproject commit 3d15a705817ff60ef6fe5e4b70ae4c09056927e3
+Subproject commit 47ca9ee09a9e1ae052c1111b3220751d3fe035b0
diff --git a/src/lua/buffer.lua b/src/lua/buffer.lua
index 182c0b015..cab0544e6 100644
--- a/src/lua/buffer.lua
+++ b/src/lua/buffer.lua
@@ -23,6 +23,8 @@ struct ibuf
     char *buf;
     /** Start of input. */
     char *rpos;
+    /** End of parsed input. */
+    char *xpos;
     /** End of useful input */
     char *wpos;
     /** End of ibuf. */
-- 
2.30.1 (Apple Git-130)



More information about the Tarantool-patches mailing list