[Tarantool-patches] [PATCH v4 10/11] wal: use a xrow buffer object for entry encoding
Georgy Kirichenko
georgy at tarantool.org
Wed Feb 12 12:39:19 MSK 2020
Wal uses an xrow buffer object in order to encode transactions and
then writes encoded data to a log file so encoded data still live
in memory for some time after a transaction is finished.
Part of #3794, #980
---
src/box/wal.c | 51 ++++++++++++++++++++++++++++++++++++++++-
src/box/xlog.c | 57 +++++++++++++++++++++++++++++-----------------
src/box/xlog.h | 14 ++++++++++++
src/box/xrow_buf.h | 3 +--
4 files changed, 101 insertions(+), 24 deletions(-)
diff --git a/src/box/wal.c b/src/box/wal.c
index ad3c79a8a..b483b8cc4 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -44,6 +44,7 @@
#include "coio_task.h"
#include "replication.h"
#include "mclock.h"
+#include "xrow_buf.h"
enum {
/**
@@ -174,6 +175,14 @@ struct wal_writer
* collecting. Ignored in case of no space error.
*/
struct vclock gc_first_vclock;
+ /**
+ * In-memory WAl write buffer used to encode transaction rows and
+ * write them to an xlog file. An in-memory buffer allows us to
+ * preserve xrows after transaction processing was finished.
+ * This buffer will be used by replication to fetch xrows from memory
+ * without xlog files access.
+ */
+ struct xrow_buf xrow_buf;
};
struct wal_msg {
@@ -1040,6 +1049,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
int64_t tsn = 0;
/** Assign LSN to all local rows. */
for ( ; row < end; row++) {
+ (*row)->tm = ev_now(loop());
if ((*row)->replica_id == 0) {
(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
vclock_get(base, instance_id);
@@ -1067,6 +1077,35 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
* the output queue. The function returns count of written
* bytes or -1 in case of error.
*/
+static ssize_t
+wal_encode_write_entry(struct wal_writer *writer, struct journal_entry *entry)
+{
+ struct errinj *inj = errinj(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT);
+ if (inj != NULL) {
+ for (struct xrow_header **row = entry->rows;
+ row < entry->rows + entry->n_rows; ++row) {
+ if (inj->iparam == (*row)->lsn) {
+ (*row)->lsn = inj->iparam - 1;
+ say_warn("injected broken lsn: %lld",
+ (long long) (*row)->lsn);
+ break;
+ }
+ }
+ }
+
+ struct iovec *iov;
+ int iov_cnt = xrow_buf_write(&writer->xrow_buf, entry->rows,
+ entry->rows + entry->n_rows, &iov);
+ if (iov_cnt < 0)
+ return -1;
+ xlog_tx_begin(&writer->current_wal);
+ ssize_t rc = xlog_write_iov(&writer->current_wal, iov, iov_cnt,
+ entry->n_rows);
+ if (rc < 0)
+ return rc;
+ return xlog_tx_commit(&writer->current_wal);
+}
+
static ssize_t
wal_write_xlog_batch(struct wal_writer *writer, struct stailq *input,
struct stailq *output, struct vclock *vclock_diff)
@@ -1082,7 +1121,7 @@ wal_write_xlog_batch(struct wal_writer *writer, struct stailq *input,
entry->rows, entry->rows + entry->n_rows);
entry->res = vclock_sum(vclock_diff) +
vclock_sum(&writer->vclock);
- rc = xlog_write_entry(l, entry);
+ rc = wal_encode_write_entry(writer, entry);
} while (rc == 0 && !stailq_empty(input));
/* If log was not flushed then flush it explicitly. */
if (rc == 0)
@@ -1155,9 +1194,12 @@ wal_write_to_disk(struct cmsg *msg)
struct stailq output;
stailq_create(&output);
while (!stailq_empty(&input)) {
+ /* Start a wal memory buffer transaction. */
+ xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
ssize_t rc = wal_write_xlog_batch(writer, &input, &output,
&vclock_diff);
if (rc < 0) {
+ xrow_buf_tx_rollback(&writer->xrow_buf);
/*
* Put processed entries and tail of write
* queue to a rollback list.
@@ -1165,6 +1207,7 @@ wal_write_to_disk(struct cmsg *msg)
stailq_concat(&wal_msg->rollback, &output);
stailq_concat(&wal_msg->rollback, &input);
} else {
+ xrow_buf_tx_commit(&writer->xrow_buf);
/*
* Schedule processed entries to commit
* and update the wal vclock.
@@ -1254,6 +1297,11 @@ wal_writer_f(va_list ap)
{
(void) ap;
struct wal_writer *writer = &wal_writer_singleton;
+ /*
+ * Initialize writer memory buffer here because it
+ * should be done in the wal thread.
+ */
+ xrow_buf_create(&writer->xrow_buf);
/** Initialize eio in this thread */
coio_enable();
@@ -1300,6 +1348,7 @@ wal_writer_f(va_list ap)
xlog_close(&vy_log_writer.xlog, false);
cpipe_destroy(&writer->tx_prio_pipe);
+ xrow_buf_destroy(&writer->xrow_buf);
return 0;
}
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 8254cce20..2fcf7f0df 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -1275,14 +1275,8 @@ xlog_tx_write(struct xlog *log)
return written;
}
-/*
- * Add a row to a log and possibly flush the log.
- *
- * @retval -1 error, check diag.
- * @retval >=0 the number of bytes written to buffer.
- */
-ssize_t
-xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+static int
+xlog_write_prepare(struct xlog *log)
{
/*
* Automatically reserve space for a fixheader when adding
@@ -1296,17 +1290,20 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
return -1;
}
}
+ return 0;
+}
- struct obuf_svp svp = obuf_create_svp(&log->obuf);
- size_t page_offset = obuf_size(&log->obuf);
- /** encode row into iovec */
- struct iovec iov[XROW_IOVMAX];
- /** don't write sync to the disk */
- int iovcnt = xrow_header_encode(packet, 0, iov, 0);
- if (iovcnt < 0) {
- obuf_rollback_to_svp(&log->obuf, &svp);
+/*
+ * Write an xrow containing iovec to a xlog.
+ */
+ssize_t
+xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int row_count)
+{
+ if (xlog_write_prepare(log) != 0)
return -1;
- }
+
+ struct obuf_svp svp = obuf_create_svp(&log->obuf);
+ size_t old_size = obuf_size(&log->obuf);
for (int i = 0; i < iovcnt; ++i) {
struct errinj *inj = errinj(ERRINJ_WAL_WRITE_PARTIAL,
ERRINJ_INT);
@@ -1325,16 +1322,34 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
return -1;
}
}
- assert(iovcnt <= XROW_IOVMAX);
- log->tx_rows++;
+ log->tx_rows += row_count;
- size_t row_size = obuf_size(&log->obuf) - page_offset;
+ ssize_t written = obuf_size(&log->obuf) - old_size;
if (log->is_autocommit &&
obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
xlog_tx_write(log) < 0)
return -1;
- return row_size;
+ return written;
+}
+
+/*
+ * Add a row to a log and possibly flush the log.
+ *
+ * @retval -1 error, check diag.
+ * @retval >=0 the number of bytes written to buffer.
+ */
+ssize_t
+xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+{
+ /** encode row into an iovec */
+ struct iovec iov[XROW_IOVMAX];
+ /** don't write sync to the disk */
+ int iovcnt = xrow_header_encode(packet, 0, iov, 0);
+ if (iovcnt < 0)
+ return -1;
+ assert(iovcnt <= XROW_IOVMAX);
+ return xlog_write_iov(log, iov, iovcnt, 1);
}
/**
diff --git a/src/box/xlog.h b/src/box/xlog.h
index a48b05fc4..d0f97f0e4 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -501,6 +501,20 @@ xlog_fallocate(struct xlog *log, size_t size);
ssize_t
xlog_write_row(struct xlog *log, const struct xrow_header *packet);
+/**
+ * Write an iov vector with rows into a xlog,
+ *
+ * @param xlog a xlog file to write into
+ * @param iov an iovec with encoded rows data
+ * @param iovcnt count of iovec members
+ * @param row_count count of encoded rows
+ *
+ * @retval count of writen bytes
+ * @retval -1 for error
+ */
+ssize_t
+xlog_write_iov(struct xlog *xlog, struct iovec *iov, int iovcnt, int row_count);
+
/**
* Prevent xlog row buffer offloading, should be use
* at transaction start to write transaction in one xlog tx
diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
index c5f624d45..d286a94d3 100644
--- a/src/box/xrow_buf.h
+++ b/src/box/xrow_buf.h
@@ -154,8 +154,7 @@ xrow_buf_tx_commit(struct xrow_buf *xrow_buf);
*/
int
xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
- struct xrow_header **end,
- struct iovec **iovec);
+ struct xrow_header **end, struct iovec **iovec);
/**
* Xrow buffer cursor used to search a position in a buffer
--
2.25.0
More information about the Tarantool-patches
mailing list