[PATCH v2 2/4] wal: preallocate disk space before writing rows
Vladimir Davydov
vdavydov.dev at gmail.com
Tue Oct 23 20:26:32 MSK 2018
This function introduces a new xlog method xlog_fallocate() that makes
sure that the requested amount of disk space is available at the current
write position. It does that with posix_fallocate(). The new method is
called before writing anything to WAL, see wal_fallocate(). In order not
to invoke the system call too often, wal_fallocate() allocates disk
space in big chunks (1 MB).
The reason why I'm doing this is that I want to have a single and
clearly defined point in the code to handle ENOSPC errors, where
I could delete old WALs and retry (this is what #3397 is about).
Needed for #3397
---
CMakeLists.txt | 1 +
src/box/journal.c | 1 +
src/box/journal.h | 4 ++++
src/box/txn.c | 1 +
src/box/wal.c | 50 +++++++++++++++++++++++++++++++++++++++++++++++
src/box/xlog.c | 45 ++++++++++++++++++++++++++++++++++++++++++
src/box/xlog.h | 16 +++++++++++++++
src/box/xrow.h | 13 ++++++++++++
src/trivia/config.h.cmake | 1 +
9 files changed, 132 insertions(+)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 439a2750..c61d5569 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -82,6 +82,7 @@ check_symbol_exists(fdatasync unistd.h HAVE_FDATASYNC)
check_symbol_exists(pthread_yield pthread.h HAVE_PTHREAD_YIELD)
check_symbol_exists(sched_yield sched.h HAVE_SCHED_YIELD)
check_symbol_exists(posix_fadvise fcntl.h HAVE_POSIX_FADVISE)
+check_symbol_exists(posix_fallocate fcntl.h HAVE_POSIX_FALLOCATE)
check_symbol_exists(mremap sys/mman.h HAVE_MREMAP)
check_function_exists(sync_file_range HAVE_SYNC_FILE_RANGE)
diff --git a/src/box/journal.c b/src/box/journal.c
index fd4f9539..99a521c3 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -66,6 +66,7 @@ journal_entry_new(size_t n_rows)
diag_set(OutOfMemory, size, "region", "struct journal_entry");
return NULL;
}
+ entry->len = 0;
entry->n_rows = n_rows;
entry->res = -1;
entry->fiber = fiber();
diff --git a/src/box/journal.h b/src/box/journal.h
index 1d64a7bd..fc495547 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -59,6 +59,10 @@ struct journal_entry {
*/
struct fiber *fiber;
/**
+ * Max size the rows are going to take when encoded.
+ */
+ size_t len;
+ /**
* The number of rows in the request.
*/
int n_rows;
diff --git a/src/box/txn.c b/src/box/txn.c
index 17d97d76..9b465561 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -272,6 +272,7 @@ txn_write_to_wal(struct txn *txn)
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
*row++ = stmt->row;
+ req->len += xrow_len_max(stmt->row);
}
assert(row == req->rows + req->n_rows);
diff --git a/src/box/wal.c b/src/box/wal.c
index f87b40ae..baa3af65 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -44,6 +44,17 @@
#include "coio_task.h"
#include "replication.h"
+enum {
+ /**
+ * Size of disk space to preallocate with xlog_fallocate().
+ * Obviously, we want to call this function as infrequent as
+ * possible to avoid the overhead associated with a system
+ * call, however at the same time we do not want to call it
+ * to allocate too big chunks, because this may increase tx
+ * latency. 1 MB seems to be a well balanced choice.
+ */
+ WAL_FALLOCATE_LEN = 1024 * 1024,
+};
const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
@@ -126,6 +137,11 @@ struct wal_writer
struct wal_msg {
struct cmsg base;
+ /**
+ * Max size the committed requests are going to take when
+ * written to disk.
+ */
+ size_t len;
/** Input queue, on output contains all committed requests. */
struct stailq commit;
/**
@@ -168,6 +184,7 @@ static void
wal_msg_create(struct wal_msg *batch)
{
cmsg_init(&batch->base, wal_request_route);
+ batch->len = 0;
stailq_create(&batch->commit);
stailq_create(&batch->rollback);
}
@@ -603,6 +620,32 @@ wal_opt_rotate(struct wal_writer *writer)
return 0;
}
+/**
+ * Make sure there's enough disk space to append @len bytes
+ * of data to the current WAL.
+ */
+static int
+wal_fallocate(struct wal_writer *writer, size_t len)
+{
+ struct xlog *l = &writer->current_wal;
+
+ /*
+ * The actual write size can be greater than the sum size
+ * of encoded rows (compression, fixheaders). Double the
+ * given length to get a rough upper bound estimate.
+ */
+ len *= 2;
+
+ if (l->alloc_len >= len)
+ return 0;
+ size_t alloc_len = MAX(WAL_FALLOCATE_LEN, len);
+ if (xlog_fallocate(l, alloc_len) == 0)
+ return 0;
+
+ diag_log();
+ return -1;
+}
+
static void
wal_writer_clear_bus(struct cmsg *msg)
{
@@ -689,6 +732,12 @@ wal_write_to_disk(struct cmsg *msg)
return wal_writer_begin_rollback(writer);
}
+ /* Ensure there's enough disk space before writing anything. */
+ if (wal_fallocate(writer, wal_msg->len) != 0) {
+ stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+ return wal_writer_begin_rollback(writer);
+ }
+
/*
* This code tries to write queued requests (=transactions) using as
* few I/O syscalls and memory copies as possible. For this reason
@@ -858,6 +907,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
stailq_add_tail_entry(&batch->commit, entry, fifo);
cpipe_push(&wal_thread.wal_pipe, &batch->base);
}
+ batch->len += entry->len;
wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
cpipe_flush_input(&wal_thread.wal_pipe);
/**
diff --git a/src/box/xlog.c b/src/box/xlog.c
index cd2d6e1d..6461e607 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -990,6 +990,26 @@ xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
return 0;
}
+ssize_t
+xlog_fallocate(struct xlog *log, size_t len)
+{
+#ifdef HAVE_POSIX_FALLOCATE
+ int rc = posix_fallocate(log->fd, log->offset + log->alloc_len, len);
+ if (rc != 0) {
+ errno = rc;
+ diag_set(SystemError, "%s: can't allocate disk space",
+ log->filename);
+ return -1;
+ }
+ log->alloc_len += len;
+ return 0;
+#else
+ (void)log;
+ (void)len;
+ return 0;
+#endif /* HAVE_POSIX_FALLOCATE */
+}
+
/**
* Write a sequence of uncompressed xrow objects.
*
@@ -1179,8 +1199,13 @@ xlog_tx_write(struct xlog *log)
if (lseek(log->fd, log->offset, SEEK_SET) < 0 ||
ftruncate(log->fd, log->offset) != 0)
panic_syserror("failed to truncate xlog after write error");
+ log->alloc_len = 0;
return -1;
}
+ if (log->alloc_len > (size_t)written)
+ log->alloc_len -= written;
+ else
+ log->alloc_len = 0;
log->offset += written;
log->rows += log->tx_rows;
log->tx_rows = 0;
@@ -1378,6 +1403,17 @@ xlog_write_eof(struct xlog *l)
diag_set(ClientError, ER_INJECTION, "xlog write injection");
return -1;
});
+
+ /*
+ * Free disk space preallocated with xlog_fallocate().
+ * Don't write the eof marker if this fails, otherwise
+ * we'll get "data after eof marker" error on recovery.
+ */
+ if (l->alloc_len > 0 && ftruncate(l->fd, l->offset) < 0) {
+ diag_set(SystemError, "ftruncate() failed");
+ return -1;
+ }
+
if (fio_writen(l->fd, &eof_marker, sizeof(eof_marker)) < 0) {
diag_set(SystemError, "write() failed");
return -1;
@@ -1793,6 +1829,15 @@ xlog_cursor_next_tx(struct xlog_cursor *i)
return -1;
if (rc > 0)
return 1;
+ if (load_u32(i->rbuf.rpos) == 0) {
+ /*
+ * Space preallocated with xlog_fallocate().
+ * Treat as eof and clear the buffer.
+ */
+ i->read_offset -= ibuf_used(&i->rbuf);
+ ibuf_reset(&i->rbuf);
+ return 1;
+ }
if (load_u32(i->rbuf.rpos) == eof_marker) {
/* eof marker found */
goto eof_found;
diff --git a/src/box/xlog.h b/src/box/xlog.h
index e3c38486..d08ef16f 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -314,6 +314,11 @@ struct xlog {
/** The current offset in the log file, for writing. */
off_t offset;
/**
+ * Size of disk space preallocated at @offset with
+ * xlog_fallocate().
+ */
+ size_t alloc_len;
+ /**
* Output buffer, works as row accumulator for
* compression.
*/
@@ -434,6 +439,17 @@ int
xlog_rename(struct xlog *l);
/**
+ * Allocate @size bytes of disk space at the end of the given
+ * xlog file.
+ *
+ * Returns -1 on fallocate error and sets both diag and errno
+ * accordingly. On success returns 0. If the underlying OS
+ * does not support fallocate, this function also returns 0.
+ */
+ssize_t
+xlog_fallocate(struct xlog *log, size_t size);
+
+/**
* Write a row to xlog,
*
* @retval count of writen bytes
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3fc007a8..b73b1f2f 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -68,6 +68,19 @@ struct xrow_header {
};
/**
+ * Return the max size which the given row is going to take when
+ * encoded into a binary packet.
+ */
+static inline size_t
+xrow_len_max(struct xrow_header *row)
+{
+ size_t len = XROW_HEADER_LEN_MAX;
+ for (int i = 0; i < row->bodycnt; i++)
+ len += row->body[i].iov_len;
+ return len;
+}
+
+/**
* Encode xrow into a binary packet
*
* @param header xrow
diff --git a/src/trivia/config.h.cmake b/src/trivia/config.h.cmake
index 8894b436..53eae2fe 100644
--- a/src/trivia/config.h.cmake
+++ b/src/trivia/config.h.cmake
@@ -166,6 +166,7 @@
#cmakedefine HAVE_PTHREAD_YIELD 1
#cmakedefine HAVE_SCHED_YIELD 1
#cmakedefine HAVE_POSIX_FADVISE 1
+#cmakedefine HAVE_POSIX_FALLOCATE 1
#cmakedefine HAVE_MREMAP 1
#cmakedefine HAVE_SYNC_FILE_RANGE 1
--
2.11.0
More information about the Tarantool-patches
mailing list