Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org
Cc: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input
Date: Mon,  6 Dec 2021 06:03:20 +0300	[thread overview]
Message-ID: <76b802ed3cfa45a87f2610a9e05b71bc14dfb36a.1638757827.git.sergepetrenko@tarantool.org> (raw)
In-Reply-To: <cover.1638757827.git.sergepetrenko@tarantool.org>

coio_read_xrow(_timeout_xc) uses an ibuf as a temporary storage for
input. It parses the xrow upon receipt and immediately discards the
input buffer. This means that the just parsed xrow's body may be used
only until the next row is read: each new read may reuse the space
where row's body was located.

So once the user has to read multiple rows (say, a transaction) before
processing them, he is forced to copy rows' bodies to some other
location.

Let's better let the user decide whether input should be immediately
discarded or not.

Introduce coio_read_xrow_ex(_timeout_xc), which doesn't discard the
input upon parse, and uses ibuf->xpos instead of ibuf->rpos to point at
the end of parsed data. This way the user is free to decide when to
advance ibuf->rpos and discard the input.

Introduce macros with old behaviour for xrow_read_xrow(_timeout_xc).

Prerequisite #6329
---
 src/box/xrow_io.cc | 28 ++++++++++++++--------------
 src/box/xrow_io.h  | 18 +++++++++++++++---
 src/lib/small      |  2 +-
 src/lua/buffer.lua |  2 ++
 4 files changed, 32 insertions(+), 18 deletions(-)

diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 20e327239..9d0d4b12e 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -36,61 +36,61 @@
 #include "msgpuck/msgpuck.h"
 
 void
-coio_read_xrow(struct iostream *io, struct ibuf *in, struct xrow_header *row)
+coio_read_xrow_ex(struct iostream *io, struct ibuf *in, struct xrow_header *row)
 {
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
+	if (ibuf_unparsed(in) < 1)
 		coio_breadn(io, in, 1);
 
 	/* Read length */
-	if (mp_typeof(*in->rpos) != MP_UINT) {
+	if (mp_typeof(*in->xpos) != MP_UINT) {
 		tnt_raise(ClientError, ER_INVALID_MSGPACK,
 			  "packet length");
 	}
-	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
+	ssize_t to_read = mp_check_uint(in->xpos, in->wpos);
 	if (to_read > 0)
 		coio_breadn(io, in, to_read);
 
-	uint32_t len = mp_decode_uint((const char **) &in->rpos);
+	uint32_t len = mp_decode_uint((const char **) &in->xpos);
 
 	/* Read header and body */
-	to_read = len - ibuf_used(in);
+	to_read = len - ibuf_unparsed(in);
 	if (to_read > 0)
 		coio_breadn(io, in, to_read);
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
+	xrow_header_decode_xc(row, (const char **) &in->xpos, in->xpos + len,
 			      true);
 }
 
 void
-coio_read_xrow_timeout_xc(struct iostream *io, struct ibuf *in,
+coio_read_xrow_ex_timeout_xc(struct iostream *io, struct ibuf *in,
 			  struct xrow_header *row, ev_tstamp timeout)
 {
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
+	if (ibuf_unparsed(in) < 1)
 		coio_breadn_timeout(io, in, 1, delay);
 	coio_timeout_update(&start, &delay);
 
 	/* Read length */
-	if (mp_typeof(*in->rpos) != MP_UINT) {
+	if (mp_typeof(*in->xpos) != MP_UINT) {
 		tnt_raise(ClientError, ER_INVALID_MSGPACK,
 			  "packet length");
 	}
-	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
+	ssize_t to_read = mp_check_uint(in->xpos, in->wpos);
 	if (to_read > 0)
 		coio_breadn_timeout(io, in, to_read, delay);
 	coio_timeout_update(&start, &delay);
 
-	uint32_t len = mp_decode_uint((const char **) &in->rpos);
+	uint32_t len = mp_decode_uint((const char **) &in->xpos);
 
 	/* Read header and body */
-	to_read = len - ibuf_used(in);
+	to_read = len - ibuf_unparsed(in);
 	if (to_read > 0)
 		coio_breadn_timeout(io, in, to_read, delay);
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
+	xrow_header_decode_xc(row, (const char **) &in->xpos, in->xpos + len,
 			      true);
 }
 
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 4e81b6e0c..5ecdd108e 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -39,11 +39,23 @@ struct iostream;
 struct xrow_header;
 
 void
-coio_read_xrow(struct iostream *io, struct ibuf *in, struct xrow_header *row);
+coio_read_xrow_ex(struct iostream *io, struct ibuf *in, struct xrow_header *row);
+
+#define coio_read_xrow(io, in, row)					\
+	do {								\
+		coio_read_xrow_ex(io, in, row);			\
+		(in)->rpos = (in)->xpos;				\
+	} while (0)
 
 void
-coio_read_xrow_timeout_xc(struct iostream *io, struct ibuf *in,
-			  struct xrow_header *row, double timeout);
+coio_read_xrow_ex_timeout_xc(struct iostream *io, struct ibuf *in,
+			     struct xrow_header *row, double timeout);
+
+#define coio_read_xrow_timeout_xc(io, in, row, timeout)		\
+	do {								\
+		coio_read_xrow_ex_timeout_xc(io, in, row, timeout);	\
+		(in)->rpos = (in)->xpos;				\
+	} while (0)
 
 void
 coio_write_xrow(struct iostream *io, const struct xrow_header *row);
diff --git a/src/lib/small b/src/lib/small
index 3d15a7058..47ca9ee09 160000
--- a/src/lib/small
+++ b/src/lib/small
@@ -1 +1 @@
-Subproject commit 3d15a705817ff60ef6fe5e4b70ae4c09056927e3
+Subproject commit 47ca9ee09a9e1ae052c1111b3220751d3fe035b0
diff --git a/src/lua/buffer.lua b/src/lua/buffer.lua
index 182c0b015..cab0544e6 100644
--- a/src/lua/buffer.lua
+++ b/src/lua/buffer.lua
@@ -23,6 +23,8 @@ struct ibuf
     char *buf;
     /** Start of input. */
     char *rpos;
+    /** End of parsed input. */
+    char *xpos;
     /** End of useful input */
     char *wpos;
     /** End of ibuf. */
-- 
2.30.1 (Apple Git-130)


  reply	other threads:[~2021-12-06  3:03 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-12-06  3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06  3:03 ` Serge Petrenko via Tarantool-patches [this message]
2021-12-06  3:05   ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches
2021-12-06  3:03 ` [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies Serge Petrenko via Tarantool-patches
2021-12-06  3:03 ` [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe() Serge Petrenko via Tarantool-patches
2021-12-06  3:03 ` [Tarantool-patches] [PATCH 4/4] Introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06  9:59 ` [Tarantool-patches] [PATCH 0/4] replication: introduce " Vladimir Davydov via Tarantool-patches
2021-12-06 10:31   ` Serge Petrenko via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=76b802ed3cfa45a87f2610a9e05b71bc14dfb36a.1638757827.git.sergepetrenko@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=sergepetrenko@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=vdavydov@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox