[Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies

Serge Petrenko sergepetrenko at tarantool.org
Mon Dec 6 06:03:21 MSK 2021


Applier, the main user of coio_read_xrow(), had to save row bodies
elsewhere, because there was no guarantee that the input buffer wouldn't
be reused before the body is processed.

This changed in the previous commit (xrow: rework coio_read_xrow to keep
parsed input), so it's time to take advantage of this change in applier.

Stop saving row bodies to fiber gc region, store them right on ibuf
instead.
Introduce the machinery needed to track ibuf reallocation.

Prerequisite #6329
---
 src/box/applier.cc | 86 +++++++++++++++++++++++++++++++++-------------
 1 file changed, 63 insertions(+), 23 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6036c19d9..a8505c93a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -553,6 +553,22 @@ struct applier_tx_row {
 	struct xrow_header row;
 };
 
+/** Defragment the input buffer: move its contents, if any, to its beginning. */
+static inline void
+ibuf_defragment(struct ibuf *ibuf)
+{
+	size_t used = ibuf_used(ibuf);
+	if (used == 0) {
+		ibuf_reset(ibuf);
+	} else {
+		size_t cap = ibuf_capacity(ibuf);
+		/*
+		 * Defragment the buffer by reserving all the available space.
+		 */
+		ibuf_reserve(ibuf, cap - used);
+	}
+}
+
 static uint64_t
 applier_wait_register(struct applier *applier, uint64_t row_count)
 {
@@ -589,8 +605,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 		}
 		if (apply_final_join_tx(applier->instance_id, &rows) != 0)
 			diag_raise();
+		/* @sa applier_subscribe(). */
+		applier->ibuf.rpos = applier->ibuf.xpos;
+		ibuf_defragment(&applier->ibuf);
 	}
 
+	ibuf_reset(&applier->ibuf);
 	return row_count;
 }
 
@@ -676,7 +696,7 @@ applier_read_tx_row(struct applier *applier, double timeout)
 
 	ERROR_INJECT_YIELD(ERRINJ_APPLIER_READ_TX_ROW_DELAY);
 
-	coio_read_xrow_timeout_xc(io, ibuf, row, timeout);
+	coio_read_xrow_ex_timeout_xc(io, ibuf, row, timeout);
 
 	if (row->tm > 0)
 		applier->lag = ev_now(loop()) - row->tm;
@@ -722,20 +742,6 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
 	if (row->is_commit) {
 		/* Signal the caller that we've reached the tx end. */
 		tsn = 0;
-	} else if (row->bodycnt == 1) {
-		/*
-		 * Save row body to gc region. Not done for single-statement
-		 * transactions and the last row of multi-statement transactions
-		 * knowing that the input buffer will not be used while the
-		 * transaction is applied.
-		 */
-		void *new_base = region_alloc(&fiber()->gc, row->body->iov_len);
-		if (new_base == NULL)
-			tnt_raise(OutOfMemory, row->body->iov_len, "region",
-				  "xrow body");
-		memcpy(new_base, row->body->iov_base, row->body->iov_len);
-		/* Adjust row body pointers. */
-		row->body->iov_base = new_base;
 	}
 
 	stailq_add_tail(rows, &tx_row->next);
@@ -744,10 +750,19 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
 
 /**
  * Read one transaction from network using applier's input buffer.
- * Transaction rows are placed onto fiber gc region.
- * We could not use applier input buffer to store rows because
- * rpos is adjusted as xrow is decoded and the corresponding
- * network input space is reused for the next xrow.
+ *
+ * The input buffer is reused to store row bodies. The only two problems to
+ * deal with are sporadic input buffer reallocation and defragmentation.
+ * We have to adjust row body pointers each time any of the two occur.
+ *
+ * Defragmentation is done manually in between the transaction reads, so it
+ * **never** happens inside this function (ibuf->rpos always points at
+ * the very beginning of the ibuf).
+ *
+ * Speaking of buffer reallocation, it only happens during the "saturation"
+ * phase, until the input buffer reaches the size big enough to hold a single
+ * transaction. Moreover, each next reallocation is exponentially less likely
+ * to happen, because the buffer size is doubled every time.
  */
 static uint64_t
 applier_read_tx(struct applier *applier, struct stailq *rows, double timeout)
@@ -757,8 +772,26 @@ applier_read_tx(struct applier *applier, struct stailq *rows, double timeout)
 
 	stailq_create(rows);
 	do {
+		const char *old_rpos = applier->ibuf.rpos;
 		struct applier_tx_row *tx_row = applier_read_tx_row(applier,
 								    timeout);
+		/* Detect ibuf reallocation or defragmentation. */
+		ssize_t delta = applier->ibuf.rpos - old_rpos;
+		if (unlikely(delta != 0)) {
+			struct applier_tx_row *item;
+			stailq_foreach_entry(item, rows, next) {
+				struct xrow_header *row = &item->row;
+				if (row->bodycnt == 0)
+					continue;
+				/*
+				 * The row body's offset relative to ibuf->rpos
+				 * is constant, so they all were moved by the
+				 * same delta as rpos was.
+				 */
+				row->body->iov_base =
+					(char *)row->body->iov_base + delta;
+			}
+		}
 		tsn = set_next_tx_row(rows, tx_row, tsn);
 		++row_count;
 	} while (tsn != 0);
@@ -1123,8 +1156,8 @@ nopify:;
 		row = &item->row;
 		row->type = IPROTO_NOP;
 		/*
-		 * Row body is saved to fiber's region and will be freed
-		 * on next fiber_gc() call.
+		 * Row body will be discarded together with the remaining
+		 * input.
 		 */
 		row->bodycnt = 0;
 	}
@@ -1434,8 +1467,15 @@ applier_subscribe(struct applier *applier)
 			diag_raise();
 		}
 
-		if (ibuf_used(ibuf) == 0)
-			ibuf_reset(ibuf);
+		/* Discard processed input. */
+		ibuf->rpos = ibuf->xpos;
+		/*
+		 * Even though this is not necessary, defragment the buffer
+		 * explicitly. Otherwise the defragmentation would be triggered
+		 * by one of the row reads, resulting in moving a bigger memory
+		 * chunk.
+		 */
+		ibuf_defragment(&applier->ibuf);
 	}
 }
 
-- 
2.30.1 (Apple Git-130)



More information about the Tarantool-patches mailing list