Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org
Cc: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies
Date: Mon,  6 Dec 2021 06:03:21 +0300	[thread overview]
Message-ID: <c5ddcd89749010c4518358c41f4a223fe420fe5c.1638757827.git.sergepetrenko@tarantool.org> (raw)
In-Reply-To: <cover.1638757827.git.sergepetrenko@tarantool.org>

Applier, the main user of coio_read_xrow(), had to save row bodies
elsewhere, because there was no guarantee that the input buffer wouldn't
be reused before the body is processed.

This changed in the previous commit (xrow: rework coio_read_xrow to keep
parsed input), so it's time to take advantage of this change in applier.

Stop saving row bodies to fiber gc region, store them right on ibuf
instead.
Introduce the machinery needed to track ibuf reallocation.

Prerequisite #6329
---
 src/box/applier.cc | 86 +++++++++++++++++++++++++++++++++-------------
 1 file changed, 63 insertions(+), 23 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6036c19d9..a8505c93a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -553,6 +553,22 @@ struct applier_tx_row {
 	struct xrow_header row;
 };
 
+/** Defragment the input buffer: move its contents, if any, to its beginning. */
+static inline void
+ibuf_defragment(struct ibuf *ibuf)
+{
+	size_t used = ibuf_used(ibuf);
+	if (used == 0) {
+		ibuf_reset(ibuf);
+	} else {
+		size_t cap = ibuf_capacity(ibuf);
+		/*
+		 * Defragment the buffer by reserving all the available space.
+		 */
+		ibuf_reserve(ibuf, cap - used);
+	}
+}
+
 static uint64_t
 applier_wait_register(struct applier *applier, uint64_t row_count)
 {
@@ -589,8 +605,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 		}
 		if (apply_final_join_tx(applier->instance_id, &rows) != 0)
 			diag_raise();
+		/* @sa applier_subscribe(). */
+		applier->ibuf.rpos = applier->ibuf.xpos;
+		ibuf_defragment(&applier->ibuf);
 	}
 
+	ibuf_reset(&applier->ibuf);
 	return row_count;
 }
 
@@ -676,7 +696,7 @@ applier_read_tx_row(struct applier *applier, double timeout)
 
 	ERROR_INJECT_YIELD(ERRINJ_APPLIER_READ_TX_ROW_DELAY);
 
-	coio_read_xrow_timeout_xc(io, ibuf, row, timeout);
+	coio_read_xrow_ex_timeout_xc(io, ibuf, row, timeout);
 
 	if (row->tm > 0)
 		applier->lag = ev_now(loop()) - row->tm;
@@ -722,20 +742,6 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
 	if (row->is_commit) {
 		/* Signal the caller that we've reached the tx end. */
 		tsn = 0;
-	} else if (row->bodycnt == 1) {
-		/*
-		 * Save row body to gc region. Not done for single-statement
-		 * transactions and the last row of multi-statement transactions
-		 * knowing that the input buffer will not be used while the
-		 * transaction is applied.
-		 */
-		void *new_base = region_alloc(&fiber()->gc, row->body->iov_len);
-		if (new_base == NULL)
-			tnt_raise(OutOfMemory, row->body->iov_len, "region",
-				  "xrow body");
-		memcpy(new_base, row->body->iov_base, row->body->iov_len);
-		/* Adjust row body pointers. */
-		row->body->iov_base = new_base;
 	}
 
 	stailq_add_tail(rows, &tx_row->next);
@@ -744,10 +750,19 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
 
 /**
  * Read one transaction from network using applier's input buffer.
- * Transaction rows are placed onto fiber gc region.
- * We could not use applier input buffer to store rows because
- * rpos is adjusted as xrow is decoded and the corresponding
- * network input space is reused for the next xrow.
+ *
+ * The input buffer is reused to store row bodies. The only two problems to
+ * deal with are sporadic input buffer reallocation and defragmentation.
+ * We have to adjust row body pointers each time any of the two occur.
+ *
+ * Defragmentation is done manually in between the transaction reads, so it
+ * **never** happens inside this function (ibuf->rpos always points at
+ * the very beginning of the ibuf).
+ *
+ * Speaking of buffer reallocation, it only happens during the "saturation"
+ * phase, until the input buffer reaches the size big enough to hold a single
+ * transaction. Moreover, each next reallocation is exponentially less likely
+ * to happen, because the buffer size is doubled every time.
  */
 static uint64_t
 applier_read_tx(struct applier *applier, struct stailq *rows, double timeout)
@@ -757,8 +772,26 @@ applier_read_tx(struct applier *applier, struct stailq *rows, double timeout)
 
 	stailq_create(rows);
 	do {
+		const char *old_rpos = applier->ibuf.rpos;
 		struct applier_tx_row *tx_row = applier_read_tx_row(applier,
 								    timeout);
+		/* Detect ibuf reallocation or defragmentation. */
+		ssize_t delta = applier->ibuf.rpos - old_rpos;
+		if (unlikely(delta != 0)) {
+			struct applier_tx_row *item;
+			stailq_foreach_entry(item, rows, next) {
+				struct xrow_header *row = &item->row;
+				if (row->bodycnt == 0)
+					continue;
+				/*
+				 * The row body's offset relative to ibuf->rpos
+				 * is constant, so they all were moved by the
+				 * same delta as rpos was.
+				 */
+				row->body->iov_base =
+					(char *)row->body->iov_base + delta;
+			}
+		}
 		tsn = set_next_tx_row(rows, tx_row, tsn);
 		++row_count;
 	} while (tsn != 0);
@@ -1123,8 +1156,8 @@ nopify:;
 		row = &item->row;
 		row->type = IPROTO_NOP;
 		/*
-		 * Row body is saved to fiber's region and will be freed
-		 * on next fiber_gc() call.
+		 * Row body will be discarded together with the remaining
+		 * input.
 		 */
 		row->bodycnt = 0;
 	}
@@ -1434,8 +1467,15 @@ applier_subscribe(struct applier *applier)
 			diag_raise();
 		}
 
-		if (ibuf_used(ibuf) == 0)
-			ibuf_reset(ibuf);
+		/* Discard processed input. */
+		ibuf->rpos = ibuf->xpos;
+		/*
+		 * Even though this is not necessary, defragment the buffer
+		 * explicitly. Otherwise the defragmentation would be triggered
+		 * by one of the row reads, resulting in moving a bigger memory
+		 * chunk.
+		 */
+		ibuf_defragment(&applier->ibuf);
 	}
 }
 
-- 
2.30.1 (Apple Git-130)


  parent reply	other threads:[~2021-12-06  3:04 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-12-06  3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06  3:03 ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches
2021-12-06  3:05   ` Serge Petrenko via Tarantool-patches
2021-12-06  3:03 ` Serge Petrenko via Tarantool-patches [this message]
2021-12-06  3:03 ` [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe() Serge Petrenko via Tarantool-patches
2021-12-06  3:03 ` [Tarantool-patches] [PATCH 4/4] Introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06  9:59 ` [Tarantool-patches] [PATCH 0/4] replication: introduce " Vladimir Davydov via Tarantool-patches
2021-12-06 10:31   ` Serge Petrenko via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=c5ddcd89749010c4518358c41f4a223fe420fe5c.1638757827.git.sergepetrenko@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=sergepetrenko@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=vdavydov@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox