From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH v2 2/4] wal: preallocate disk space before writing rows Date: Tue, 23 Oct 2018 20:26:32 +0300 Message-Id: <484c921eb04a0a94563dcccf058bb5e8f96d3a49.1540314925.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: This function introduces a new xlog method xlog_fallocate() that makes sure that the requested amount of disk space is available at the current write position. It does that with posix_fallocate(). The new method is called before writing anything to WAL, see wal_fallocate(). In order not to invoke the system call too often, wal_fallocate() allocates disk space in big chunks (1 MB). The reason why I'm doing this is that I want to have a single and clearly defined point in the code to handle ENOSPC errors, where I could delete old WALs and retry (this is what #3397 is about). Needed for #3397 --- CMakeLists.txt | 1 + src/box/journal.c | 1 + src/box/journal.h | 4 ++++ src/box/txn.c | 1 + src/box/wal.c | 50 +++++++++++++++++++++++++++++++++++++++++++++++ src/box/xlog.c | 45 ++++++++++++++++++++++++++++++++++++++++++ src/box/xlog.h | 16 +++++++++++++++ src/box/xrow.h | 13 ++++++++++++ src/trivia/config.h.cmake | 1 + 9 files changed, 132 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 439a2750..c61d5569 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,6 +82,7 @@ check_symbol_exists(fdatasync unistd.h HAVE_FDATASYNC) check_symbol_exists(pthread_yield pthread.h HAVE_PTHREAD_YIELD) check_symbol_exists(sched_yield sched.h HAVE_SCHED_YIELD) check_symbol_exists(posix_fadvise fcntl.h HAVE_POSIX_FADVISE) +check_symbol_exists(posix_fallocate fcntl.h HAVE_POSIX_FALLOCATE) check_symbol_exists(mremap sys/mman.h HAVE_MREMAP) check_function_exists(sync_file_range HAVE_SYNC_FILE_RANGE) diff --git a/src/box/journal.c b/src/box/journal.c index fd4f9539..99a521c3 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -66,6 +66,7 @@ journal_entry_new(size_t n_rows) diag_set(OutOfMemory, size, "region", "struct journal_entry"); return NULL; } + entry->len = 0; entry->n_rows = n_rows; entry->res = -1; entry->fiber = fiber(); diff --git a/src/box/journal.h b/src/box/journal.h index 1d64a7bd..fc495547 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -59,6 +59,10 @@ struct journal_entry { */ struct fiber *fiber; /** + * Max size the rows are going to take when encoded. + */ + size_t len; + /** * The number of rows in the request. */ int n_rows; diff --git a/src/box/txn.c b/src/box/txn.c index 17d97d76..9b465561 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -272,6 +272,7 @@ txn_write_to_wal(struct txn *txn) if (stmt->row == NULL) continue; /* A read (e.g. select) request */ *row++ = stmt->row; + req->len += xrow_len_max(stmt->row); } assert(row == req->rows + req->n_rows); diff --git a/src/box/wal.c b/src/box/wal.c index f87b40ae..baa3af65 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -44,6 +44,17 @@ #include "coio_task.h" #include "replication.h" +enum { + /** + * Size of disk space to preallocate with xlog_fallocate(). + * Obviously, we want to call this function as infrequent as + * possible to avoid the overhead associated with a system + * call, however at the same time we do not want to call it + * to allocate too big chunks, because this may increase tx + * latency. 1 MB seems to be a well balanced choice. + */ + WAL_FALLOCATE_LEN = 1024 * 1024, +}; const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; @@ -126,6 +137,11 @@ struct wal_writer struct wal_msg { struct cmsg base; + /** + * Max size the committed requests are going to take when + * written to disk. + */ + size_t len; /** Input queue, on output contains all committed requests. */ struct stailq commit; /** @@ -168,6 +184,7 @@ static void wal_msg_create(struct wal_msg *batch) { cmsg_init(&batch->base, wal_request_route); + batch->len = 0; stailq_create(&batch->commit); stailq_create(&batch->rollback); } @@ -603,6 +620,32 @@ wal_opt_rotate(struct wal_writer *writer) return 0; } +/** + * Make sure there's enough disk space to append @len bytes + * of data to the current WAL. + */ +static int +wal_fallocate(struct wal_writer *writer, size_t len) +{ + struct xlog *l = &writer->current_wal; + + /* + * The actual write size can be greater than the sum size + * of encoded rows (compression, fixheaders). Double the + * given length to get a rough upper bound estimate. + */ + len *= 2; + + if (l->alloc_len >= len) + return 0; + size_t alloc_len = MAX(WAL_FALLOCATE_LEN, len); + if (xlog_fallocate(l, alloc_len) == 0) + return 0; + + diag_log(); + return -1; +} + static void wal_writer_clear_bus(struct cmsg *msg) { @@ -689,6 +732,12 @@ wal_write_to_disk(struct cmsg *msg) return wal_writer_begin_rollback(writer); } + /* Ensure there's enough disk space before writing anything. */ + if (wal_fallocate(writer, wal_msg->len) != 0) { + stailq_concat(&wal_msg->rollback, &wal_msg->commit); + return wal_writer_begin_rollback(writer); + } + /* * This code tries to write queued requests (=transactions) using as * few I/O syscalls and memory copies as possible. For this reason @@ -858,6 +907,7 @@ wal_write(struct journal *journal, struct journal_entry *entry) stailq_add_tail_entry(&batch->commit, entry, fifo); cpipe_push(&wal_thread.wal_pipe, &batch->base); } + batch->len += entry->len; wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX; cpipe_flush_input(&wal_thread.wal_pipe); /** diff --git a/src/box/xlog.c b/src/box/xlog.c index cd2d6e1d..6461e607 100644 --- a/src/box/xlog.c +++ b/src/box/xlog.c @@ -990,6 +990,26 @@ xdir_create_xlog(struct xdir *dir, struct xlog *xlog, return 0; } +ssize_t +xlog_fallocate(struct xlog *log, size_t len) +{ +#ifdef HAVE_POSIX_FALLOCATE + int rc = posix_fallocate(log->fd, log->offset + log->alloc_len, len); + if (rc != 0) { + errno = rc; + diag_set(SystemError, "%s: can't allocate disk space", + log->filename); + return -1; + } + log->alloc_len += len; + return 0; +#else + (void)log; + (void)len; + return 0; +#endif /* HAVE_POSIX_FALLOCATE */ +} + /** * Write a sequence of uncompressed xrow objects. * @@ -1179,8 +1199,13 @@ xlog_tx_write(struct xlog *log) if (lseek(log->fd, log->offset, SEEK_SET) < 0 || ftruncate(log->fd, log->offset) != 0) panic_syserror("failed to truncate xlog after write error"); + log->alloc_len = 0; return -1; } + if (log->alloc_len > (size_t)written) + log->alloc_len -= written; + else + log->alloc_len = 0; log->offset += written; log->rows += log->tx_rows; log->tx_rows = 0; @@ -1378,6 +1403,17 @@ xlog_write_eof(struct xlog *l) diag_set(ClientError, ER_INJECTION, "xlog write injection"); return -1; }); + + /* + * Free disk space preallocated with xlog_fallocate(). + * Don't write the eof marker if this fails, otherwise + * we'll get "data after eof marker" error on recovery. + */ + if (l->alloc_len > 0 && ftruncate(l->fd, l->offset) < 0) { + diag_set(SystemError, "ftruncate() failed"); + return -1; + } + if (fio_writen(l->fd, &eof_marker, sizeof(eof_marker)) < 0) { diag_set(SystemError, "write() failed"); return -1; @@ -1793,6 +1829,15 @@ xlog_cursor_next_tx(struct xlog_cursor *i) return -1; if (rc > 0) return 1; + if (load_u32(i->rbuf.rpos) == 0) { + /* + * Space preallocated with xlog_fallocate(). + * Treat as eof and clear the buffer. + */ + i->read_offset -= ibuf_used(&i->rbuf); + ibuf_reset(&i->rbuf); + return 1; + } if (load_u32(i->rbuf.rpos) == eof_marker) { /* eof marker found */ goto eof_found; diff --git a/src/box/xlog.h b/src/box/xlog.h index e3c38486..d08ef16f 100644 --- a/src/box/xlog.h +++ b/src/box/xlog.h @@ -314,6 +314,11 @@ struct xlog { /** The current offset in the log file, for writing. */ off_t offset; /** + * Size of disk space preallocated at @offset with + * xlog_fallocate(). + */ + size_t alloc_len; + /** * Output buffer, works as row accumulator for * compression. */ @@ -434,6 +439,17 @@ int xlog_rename(struct xlog *l); /** + * Allocate @size bytes of disk space at the end of the given + * xlog file. + * + * Returns -1 on fallocate error and sets both diag and errno + * accordingly. On success returns 0. If the underlying OS + * does not support fallocate, this function also returns 0. + */ +ssize_t +xlog_fallocate(struct xlog *log, size_t size); + +/** * Write a row to xlog, * * @retval count of writen bytes diff --git a/src/box/xrow.h b/src/box/xrow.h index 3fc007a8..b73b1f2f 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -68,6 +68,19 @@ struct xrow_header { }; /** + * Return the max size which the given row is going to take when + * encoded into a binary packet. + */ +static inline size_t +xrow_len_max(struct xrow_header *row) +{ + size_t len = XROW_HEADER_LEN_MAX; + for (int i = 0; i < row->bodycnt; i++) + len += row->body[i].iov_len; + return len; +} + +/** * Encode xrow into a binary packet * * @param header xrow diff --git a/src/trivia/config.h.cmake b/src/trivia/config.h.cmake index 8894b436..53eae2fe 100644 --- a/src/trivia/config.h.cmake +++ b/src/trivia/config.h.cmake @@ -166,6 +166,7 @@ #cmakedefine HAVE_PTHREAD_YIELD 1 #cmakedefine HAVE_SCHED_YIELD 1 #cmakedefine HAVE_POSIX_FADVISE 1 +#cmakedefine HAVE_POSIX_FALLOCATE 1 #cmakedefine HAVE_MREMAP 1 #cmakedefine HAVE_SYNC_FILE_RANGE 1 -- 2.11.0