From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org Cc: tarantool-patches@dev.tarantool.org Subject: [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Date: Mon, 6 Dec 2021 06:03:20 +0300 [thread overview] Message-ID: <76b802ed3cfa45a87f2610a9e05b71bc14dfb36a.1638757827.git.sergepetrenko@tarantool.org> (raw) In-Reply-To: <cover.1638757827.git.sergepetrenko@tarantool.org> coio_read_xrow(_timeout_xc) uses an ibuf as a temporary storage for input. It parses the xrow upon receipt and immediately discards the input buffer. This means that the just parsed xrow's body may be used only until the next row is read: each new read may reuse the space where row's body was located. So once the user has to read multiple rows (say, a transaction) before processing them, he is forced to copy rows' bodies to some other location. Let's better let the user decide whether input should be immediately discarded or not. Introduce coio_read_xrow_ex(_timeout_xc), which doesn't discard the input upon parse, and uses ibuf->xpos instead of ibuf->rpos to point at the end of parsed data. This way the user is free to decide when to advance ibuf->rpos and discard the input. Introduce macros with old behaviour for xrow_read_xrow(_timeout_xc). Prerequisite #6329 --- src/box/xrow_io.cc | 28 ++++++++++++++-------------- src/box/xrow_io.h | 18 +++++++++++++++--- src/lib/small | 2 +- src/lua/buffer.lua | 2 ++ 4 files changed, 32 insertions(+), 18 deletions(-) diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc index 20e327239..9d0d4b12e 100644 --- a/src/box/xrow_io.cc +++ b/src/box/xrow_io.cc @@ -36,61 +36,61 @@ #include "msgpuck/msgpuck.h" void -coio_read_xrow(struct iostream *io, struct ibuf *in, struct xrow_header *row) +coio_read_xrow_ex(struct iostream *io, struct ibuf *in, struct xrow_header *row) { /* Read fixed header */ - if (ibuf_used(in) < 1) + if (ibuf_unparsed(in) < 1) coio_breadn(io, in, 1); /* Read length */ - if (mp_typeof(*in->rpos) != MP_UINT) { + if (mp_typeof(*in->xpos) != MP_UINT) { tnt_raise(ClientError, ER_INVALID_MSGPACK, "packet length"); } - ssize_t to_read = mp_check_uint(in->rpos, in->wpos); + ssize_t to_read = mp_check_uint(in->xpos, in->wpos); if (to_read > 0) coio_breadn(io, in, to_read); - uint32_t len = mp_decode_uint((const char **) &in->rpos); + uint32_t len = mp_decode_uint((const char **) &in->xpos); /* Read header and body */ - to_read = len - ibuf_used(in); + to_read = len - ibuf_unparsed(in); if (to_read > 0) coio_breadn(io, in, to_read); - xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len, + xrow_header_decode_xc(row, (const char **) &in->xpos, in->xpos + len, true); } void -coio_read_xrow_timeout_xc(struct iostream *io, struct ibuf *in, +coio_read_xrow_ex_timeout_xc(struct iostream *io, struct ibuf *in, struct xrow_header *row, ev_tstamp timeout) { ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); /* Read fixed header */ - if (ibuf_used(in) < 1) + if (ibuf_unparsed(in) < 1) coio_breadn_timeout(io, in, 1, delay); coio_timeout_update(&start, &delay); /* Read length */ - if (mp_typeof(*in->rpos) != MP_UINT) { + if (mp_typeof(*in->xpos) != MP_UINT) { tnt_raise(ClientError, ER_INVALID_MSGPACK, "packet length"); } - ssize_t to_read = mp_check_uint(in->rpos, in->wpos); + ssize_t to_read = mp_check_uint(in->xpos, in->wpos); if (to_read > 0) coio_breadn_timeout(io, in, to_read, delay); coio_timeout_update(&start, &delay); - uint32_t len = mp_decode_uint((const char **) &in->rpos); + uint32_t len = mp_decode_uint((const char **) &in->xpos); /* Read header and body */ - to_read = len - ibuf_used(in); + to_read = len - ibuf_unparsed(in); if (to_read > 0) coio_breadn_timeout(io, in, to_read, delay); - xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len, + xrow_header_decode_xc(row, (const char **) &in->xpos, in->xpos + len, true); } diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h index 4e81b6e0c..5ecdd108e 100644 --- a/src/box/xrow_io.h +++ b/src/box/xrow_io.h @@ -39,11 +39,23 @@ struct iostream; struct xrow_header; void -coio_read_xrow(struct iostream *io, struct ibuf *in, struct xrow_header *row); +coio_read_xrow_ex(struct iostream *io, struct ibuf *in, struct xrow_header *row); + +#define coio_read_xrow(io, in, row) \ + do { \ + coio_read_xrow_ex(io, in, row); \ + (in)->rpos = (in)->xpos; \ + } while (0) void -coio_read_xrow_timeout_xc(struct iostream *io, struct ibuf *in, - struct xrow_header *row, double timeout); +coio_read_xrow_ex_timeout_xc(struct iostream *io, struct ibuf *in, + struct xrow_header *row, double timeout); + +#define coio_read_xrow_timeout_xc(io, in, row, timeout) \ + do { \ + coio_read_xrow_ex_timeout_xc(io, in, row, timeout); \ + (in)->rpos = (in)->xpos; \ + } while (0) void coio_write_xrow(struct iostream *io, const struct xrow_header *row); diff --git a/src/lib/small b/src/lib/small index 3d15a7058..47ca9ee09 160000 --- a/src/lib/small +++ b/src/lib/small @@ -1 +1 @@ -Subproject commit 3d15a705817ff60ef6fe5e4b70ae4c09056927e3 +Subproject commit 47ca9ee09a9e1ae052c1111b3220751d3fe035b0 diff --git a/src/lua/buffer.lua b/src/lua/buffer.lua index 182c0b015..cab0544e6 100644 --- a/src/lua/buffer.lua +++ b/src/lua/buffer.lua @@ -23,6 +23,8 @@ struct ibuf char *buf; /** Start of input. */ char *rpos; + /** End of parsed input. */ + char *xpos; /** End of useful input */ char *wpos; /** End of ibuf. */ -- 2.30.1 (Apple Git-130)
next prev parent reply other threads:[~2021-12-06 3:03 UTC|newest] Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-12-06 3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches 2021-12-06 3:03 ` Serge Petrenko via Tarantool-patches [this message] 2021-12-06 3:05 ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches 2021-12-06 3:03 ` [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies Serge Petrenko via Tarantool-patches 2021-12-06 3:03 ` [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe() Serge Petrenko via Tarantool-patches 2021-12-06 3:03 ` [Tarantool-patches] [PATCH 4/4] Introduce applier thread Serge Petrenko via Tarantool-patches 2021-12-06 9:59 ` [Tarantool-patches] [PATCH 0/4] replication: introduce " Vladimir Davydov via Tarantool-patches 2021-12-06 10:31 ` Serge Petrenko via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=76b802ed3cfa45a87f2610a9e05b71bc14dfb36a.1638757827.git.sergepetrenko@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=sergepetrenko@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=vdavydov@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox