From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 4619D4696CB for ; Wed, 12 Feb 2020 12:39:29 +0300 (MSK) From: Georgy Kirichenko Date: Wed, 12 Feb 2020 12:39:19 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v4 10/11] wal: use a xrow buffer object for entry encoding List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Wal uses an xrow buffer object in order to encode transactions and then writes encoded data to a log file so encoded data still live in memory for some time after a transaction is finished. Part of #3794, #980 --- src/box/wal.c | 51 ++++++++++++++++++++++++++++++++++++++++- src/box/xlog.c | 57 +++++++++++++++++++++++++++++----------------- src/box/xlog.h | 14 ++++++++++++ src/box/xrow_buf.h | 3 +-- 4 files changed, 101 insertions(+), 24 deletions(-) diff --git a/src/box/wal.c b/src/box/wal.c index ad3c79a8a..b483b8cc4 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -44,6 +44,7 @@ #include "coio_task.h" #include "replication.h" #include "mclock.h" +#include "xrow_buf.h" enum { /** @@ -174,6 +175,14 @@ struct wal_writer * collecting. Ignored in case of no space error. */ struct vclock gc_first_vclock; + /** + * In-memory WAl write buffer used to encode transaction rows and + * write them to an xlog file. An in-memory buffer allows us to + * preserve xrows after transaction processing was finished. + * This buffer will be used by replication to fetch xrows from memory + * without xlog files access. + */ + struct xrow_buf xrow_buf; }; struct wal_msg { @@ -1040,6 +1049,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, int64_t tsn = 0; /** Assign LSN to all local rows. */ for ( ; row < end; row++) { + (*row)->tm = ev_now(loop()); if ((*row)->replica_id == 0) { (*row)->lsn = vclock_inc(vclock_diff, instance_id) + vclock_get(base, instance_id); @@ -1067,6 +1077,35 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, * the output queue. The function returns count of written * bytes or -1 in case of error. */ +static ssize_t +wal_encode_write_entry(struct wal_writer *writer, struct journal_entry *entry) +{ + struct errinj *inj = errinj(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT); + if (inj != NULL) { + for (struct xrow_header **row = entry->rows; + row < entry->rows + entry->n_rows; ++row) { + if (inj->iparam == (*row)->lsn) { + (*row)->lsn = inj->iparam - 1; + say_warn("injected broken lsn: %lld", + (long long) (*row)->lsn); + break; + } + } + } + + struct iovec *iov; + int iov_cnt = xrow_buf_write(&writer->xrow_buf, entry->rows, + entry->rows + entry->n_rows, &iov); + if (iov_cnt < 0) + return -1; + xlog_tx_begin(&writer->current_wal); + ssize_t rc = xlog_write_iov(&writer->current_wal, iov, iov_cnt, + entry->n_rows); + if (rc < 0) + return rc; + return xlog_tx_commit(&writer->current_wal); +} + static ssize_t wal_write_xlog_batch(struct wal_writer *writer, struct stailq *input, struct stailq *output, struct vclock *vclock_diff) @@ -1082,7 +1121,7 @@ wal_write_xlog_batch(struct wal_writer *writer, struct stailq *input, entry->rows, entry->rows + entry->n_rows); entry->res = vclock_sum(vclock_diff) + vclock_sum(&writer->vclock); - rc = xlog_write_entry(l, entry); + rc = wal_encode_write_entry(writer, entry); } while (rc == 0 && !stailq_empty(input)); /* If log was not flushed then flush it explicitly. */ if (rc == 0) @@ -1155,9 +1194,12 @@ wal_write_to_disk(struct cmsg *msg) struct stailq output; stailq_create(&output); while (!stailq_empty(&input)) { + /* Start a wal memory buffer transaction. */ + xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock); ssize_t rc = wal_write_xlog_batch(writer, &input, &output, &vclock_diff); if (rc < 0) { + xrow_buf_tx_rollback(&writer->xrow_buf); /* * Put processed entries and tail of write * queue to a rollback list. @@ -1165,6 +1207,7 @@ wal_write_to_disk(struct cmsg *msg) stailq_concat(&wal_msg->rollback, &output); stailq_concat(&wal_msg->rollback, &input); } else { + xrow_buf_tx_commit(&writer->xrow_buf); /* * Schedule processed entries to commit * and update the wal vclock. @@ -1254,6 +1297,11 @@ wal_writer_f(va_list ap) { (void) ap; struct wal_writer *writer = &wal_writer_singleton; + /* + * Initialize writer memory buffer here because it + * should be done in the wal thread. + */ + xrow_buf_create(&writer->xrow_buf); /** Initialize eio in this thread */ coio_enable(); @@ -1300,6 +1348,7 @@ wal_writer_f(va_list ap) xlog_close(&vy_log_writer.xlog, false); cpipe_destroy(&writer->tx_prio_pipe); + xrow_buf_destroy(&writer->xrow_buf); return 0; } diff --git a/src/box/xlog.c b/src/box/xlog.c index 8254cce20..2fcf7f0df 100644 --- a/src/box/xlog.c +++ b/src/box/xlog.c @@ -1275,14 +1275,8 @@ xlog_tx_write(struct xlog *log) return written; } -/* - * Add a row to a log and possibly flush the log. - * - * @retval -1 error, check diag. - * @retval >=0 the number of bytes written to buffer. - */ -ssize_t -xlog_write_row(struct xlog *log, const struct xrow_header *packet) +static int +xlog_write_prepare(struct xlog *log) { /* * Automatically reserve space for a fixheader when adding @@ -1296,17 +1290,20 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet) return -1; } } + return 0; +} - struct obuf_svp svp = obuf_create_svp(&log->obuf); - size_t page_offset = obuf_size(&log->obuf); - /** encode row into iovec */ - struct iovec iov[XROW_IOVMAX]; - /** don't write sync to the disk */ - int iovcnt = xrow_header_encode(packet, 0, iov, 0); - if (iovcnt < 0) { - obuf_rollback_to_svp(&log->obuf, &svp); +/* + * Write an xrow containing iovec to a xlog. + */ +ssize_t +xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int row_count) +{ + if (xlog_write_prepare(log) != 0) return -1; - } + + struct obuf_svp svp = obuf_create_svp(&log->obuf); + size_t old_size = obuf_size(&log->obuf); for (int i = 0; i < iovcnt; ++i) { struct errinj *inj = errinj(ERRINJ_WAL_WRITE_PARTIAL, ERRINJ_INT); @@ -1325,16 +1322,34 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet) return -1; } } - assert(iovcnt <= XROW_IOVMAX); - log->tx_rows++; + log->tx_rows += row_count; - size_t row_size = obuf_size(&log->obuf) - page_offset; + ssize_t written = obuf_size(&log->obuf) - old_size; if (log->is_autocommit && obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD && xlog_tx_write(log) < 0) return -1; - return row_size; + return written; +} + +/* + * Add a row to a log and possibly flush the log. + * + * @retval -1 error, check diag. + * @retval >=0 the number of bytes written to buffer. + */ +ssize_t +xlog_write_row(struct xlog *log, const struct xrow_header *packet) +{ + /** encode row into an iovec */ + struct iovec iov[XROW_IOVMAX]; + /** don't write sync to the disk */ + int iovcnt = xrow_header_encode(packet, 0, iov, 0); + if (iovcnt < 0) + return -1; + assert(iovcnt <= XROW_IOVMAX); + return xlog_write_iov(log, iov, iovcnt, 1); } /** diff --git a/src/box/xlog.h b/src/box/xlog.h index a48b05fc4..d0f97f0e4 100644 --- a/src/box/xlog.h +++ b/src/box/xlog.h @@ -501,6 +501,20 @@ xlog_fallocate(struct xlog *log, size_t size); ssize_t xlog_write_row(struct xlog *log, const struct xrow_header *packet); +/** + * Write an iov vector with rows into a xlog, + * + * @param xlog a xlog file to write into + * @param iov an iovec with encoded rows data + * @param iovcnt count of iovec members + * @param row_count count of encoded rows + * + * @retval count of writen bytes + * @retval -1 for error + */ +ssize_t +xlog_write_iov(struct xlog *xlog, struct iovec *iov, int iovcnt, int row_count); + /** * Prevent xlog row buffer offloading, should be use * at transaction start to write transaction in one xlog tx diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h index c5f624d45..d286a94d3 100644 --- a/src/box/xrow_buf.h +++ b/src/box/xrow_buf.h @@ -154,8 +154,7 @@ xrow_buf_tx_commit(struct xrow_buf *xrow_buf); */ int xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin, - struct xrow_header **end, - struct iovec **iovec); + struct xrow_header **end, struct iovec **iovec); /** * Xrow buffer cursor used to search a position in a buffer -- 2.25.0