[Tarantool-patches] [PATCH v4 10/11] wal: use a xrow buffer object for entry encoding

Georgy Kirichenko georgy at tarantool.org
Wed Feb 12 12:39:19 MSK 2020


Wal uses an xrow buffer object in order to encode transactions and
then writes encoded data to a log file so encoded data still live
in memory for some time after a transaction is finished.

Part of #3794, #980
---
 src/box/wal.c      | 51 ++++++++++++++++++++++++++++++++++++++++-
 src/box/xlog.c     | 57 +++++++++++++++++++++++++++++-----------------
 src/box/xlog.h     | 14 ++++++++++++
 src/box/xrow_buf.h |  3 +--
 4 files changed, 101 insertions(+), 24 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index ad3c79a8a..b483b8cc4 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -44,6 +44,7 @@
 #include "coio_task.h"
 #include "replication.h"
 #include "mclock.h"
+#include "xrow_buf.h"
 
 enum {
 	/**
@@ -174,6 +175,14 @@ struct wal_writer
 	 * collecting. Ignored in case of no space error.
 	 */
 	struct vclock gc_first_vclock;
+	/**
+	 * In-memory WAl write buffer used to encode transaction rows and
+	 * write them to an xlog file. An in-memory buffer allows us to
+	 * preserve xrows after transaction processing was finished.
+	 * This buffer will be used by replication to fetch xrows from memory
+	 * without xlog files access.
+	 */
+	struct xrow_buf xrow_buf;
 };
 
 struct wal_msg {
@@ -1040,6 +1049,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 	int64_t tsn = 0;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
+		(*row)->tm = ev_now(loop());
 		if ((*row)->replica_id == 0) {
 			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
 				      vclock_get(base, instance_id);
@@ -1067,6 +1077,35 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
  * the output queue. The function returns count of written
  * bytes or -1 in case of error.
  */
+static ssize_t
+wal_encode_write_entry(struct wal_writer *writer, struct journal_entry *entry)
+{
+	struct errinj *inj = errinj(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT);
+	if (inj != NULL) {
+		for (struct xrow_header **row = entry->rows;
+		     row < entry->rows + entry->n_rows; ++row) {
+			if (inj->iparam == (*row)->lsn) {
+				(*row)->lsn = inj->iparam - 1;
+				say_warn("injected broken lsn: %lld",
+					 (long long) (*row)->lsn);
+				break;
+			}
+		}
+	}
+
+	struct iovec *iov;
+	int iov_cnt = xrow_buf_write(&writer->xrow_buf, entry->rows,
+				     entry->rows + entry->n_rows, &iov);
+	if (iov_cnt < 0)
+		return -1;
+	xlog_tx_begin(&writer->current_wal);
+	ssize_t rc = xlog_write_iov(&writer->current_wal, iov, iov_cnt,
+				    entry->n_rows);
+	if (rc < 0)
+		return rc;
+	return xlog_tx_commit(&writer->current_wal);
+}
+
 static ssize_t
 wal_write_xlog_batch(struct wal_writer *writer, struct stailq *input,
 		     struct stailq *output, struct vclock *vclock_diff)
@@ -1082,7 +1121,7 @@ wal_write_xlog_batch(struct wal_writer *writer, struct stailq *input,
 			       entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(vclock_diff) +
 			     vclock_sum(&writer->vclock);
-		rc = xlog_write_entry(l, entry);
+		rc = wal_encode_write_entry(writer, entry);
 	} while (rc == 0 && !stailq_empty(input));
 	/* If log was not flushed then flush it explicitly. */
 	if (rc == 0)
@@ -1155,9 +1194,12 @@ wal_write_to_disk(struct cmsg *msg)
 	struct stailq output;
 	stailq_create(&output);
 	while (!stailq_empty(&input)) {
+		/* Start a wal memory buffer transaction. */
+		xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
 		ssize_t rc = wal_write_xlog_batch(writer, &input, &output,
 						  &vclock_diff);
 		if (rc < 0) {
+			xrow_buf_tx_rollback(&writer->xrow_buf);
 			/*
 			 * Put processed entries and tail of write
 			 * queue to a rollback list.
@@ -1165,6 +1207,7 @@ wal_write_to_disk(struct cmsg *msg)
 			stailq_concat(&wal_msg->rollback, &output);
 			stailq_concat(&wal_msg->rollback, &input);
 		} else {
+			xrow_buf_tx_commit(&writer->xrow_buf);
 			/*
 			 * Schedule processed entries to commit
 			 * and update the wal vclock.
@@ -1254,6 +1297,11 @@ wal_writer_f(va_list ap)
 {
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
+	/*
+	 * Initialize writer memory buffer here because it
+	 * should be done in the wal thread.
+	 */
+	xrow_buf_create(&writer->xrow_buf);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1300,6 +1348,7 @@ wal_writer_f(va_list ap)
 		xlog_close(&vy_log_writer.xlog, false);
 
 	cpipe_destroy(&writer->tx_prio_pipe);
+	xrow_buf_destroy(&writer->xrow_buf);
 	return 0;
 }
 
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 8254cce20..2fcf7f0df 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -1275,14 +1275,8 @@ xlog_tx_write(struct xlog *log)
 	return written;
 }
 
-/*
- * Add a row to a log and possibly flush the log.
- *
- * @retval  -1 error, check diag.
- * @retval >=0 the number of bytes written to buffer.
- */
-ssize_t
-xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+static int
+xlog_write_prepare(struct xlog *log)
 {
 	/*
 	 * Automatically reserve space for a fixheader when adding
@@ -1296,17 +1290,20 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
+	return 0;
+}
 
-	struct obuf_svp svp = obuf_create_svp(&log->obuf);
-	size_t page_offset = obuf_size(&log->obuf);
-	/** encode row into iovec */
-	struct iovec iov[XROW_IOVMAX];
-	/** don't write sync to the disk */
-	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
-	if (iovcnt < 0) {
-		obuf_rollback_to_svp(&log->obuf, &svp);
+/*
+ * Write an xrow containing iovec to a xlog.
+ */
+ssize_t
+xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int row_count)
+{
+	if (xlog_write_prepare(log) != 0)
 		return -1;
-	}
+
+	struct obuf_svp svp = obuf_create_svp(&log->obuf);
+	size_t old_size = obuf_size(&log->obuf);
 	for (int i = 0; i < iovcnt; ++i) {
 		struct errinj *inj = errinj(ERRINJ_WAL_WRITE_PARTIAL,
 					    ERRINJ_INT);
@@ -1325,16 +1322,34 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
-	assert(iovcnt <= XROW_IOVMAX);
-	log->tx_rows++;
+	log->tx_rows += row_count;
 
-	size_t row_size = obuf_size(&log->obuf) - page_offset;
+	ssize_t written = obuf_size(&log->obuf) - old_size;
 	if (log->is_autocommit &&
 	    obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
 	    xlog_tx_write(log) < 0)
 		return -1;
 
-	return row_size;
+	return written;
+}
+
+/*
+ * Add a row to a log and possibly flush the log.
+ *
+ * @retval  -1 error, check diag.
+ * @retval >=0 the number of bytes written to buffer.
+ */
+ssize_t
+xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+{
+	/** encode row into an iovec */
+	struct iovec iov[XROW_IOVMAX];
+	/** don't write sync to the disk */
+	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
+	if (iovcnt < 0)
+		return -1;
+	assert(iovcnt <= XROW_IOVMAX);
+	return xlog_write_iov(log, iov, iovcnt, 1);
 }
 
 /**
diff --git a/src/box/xlog.h b/src/box/xlog.h
index a48b05fc4..d0f97f0e4 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -501,6 +501,20 @@ xlog_fallocate(struct xlog *log, size_t size);
 ssize_t
 xlog_write_row(struct xlog *log, const struct xrow_header *packet);
 
+/**
+ * Write an iov vector with rows into a xlog,
+ *
+ * @param xlog a xlog file to write into
+ * @param iov an iovec with encoded rows data
+ * @param iovcnt count of iovec members
+ * @param row_count count of encoded rows
+ *
+ * @retval count of writen bytes
+ * @retval -1 for error
+ */
+ssize_t
+xlog_write_iov(struct xlog *xlog, struct iovec *iov, int iovcnt, int row_count);
+
 /**
  * Prevent xlog row buffer offloading, should be use
  * at transaction start to write transaction in one xlog tx
diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
index c5f624d45..d286a94d3 100644
--- a/src/box/xrow_buf.h
+++ b/src/box/xrow_buf.h
@@ -154,8 +154,7 @@ xrow_buf_tx_commit(struct xrow_buf *xrow_buf);
  */
 int
 xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
-	       struct xrow_header **end,
-	       struct iovec **iovec);
+	       struct xrow_header **end, struct iovec **iovec);
 
 /**
  * Xrow buffer cursor used to search a position in a buffer
-- 
2.25.0



More information about the Tarantool-patches mailing list