[PATCH 2/5] wal: preallocate disk space before writing rows

Vladimir Davydov vdavydov.dev at gmail.com
Sun Oct 7 23:27:15 MSK 2018


This function introduces a new xlog method xlog_fallocate() that makes
sure that the requested amount of disk space is available at the current
write position. It does that with posix_fallocate(). The new method is
called before writing anything to WAL. In order not to invoke a system
call too often, xlog_fallocate() allocates more than requested.

The primary reason why I'm doing this is that I want to have a single
and clearly defined point in the code to handle ENOSPC errors, where I
could delete old WALs and retry (this is what #3397 is about). I could
probably handle ENOSPC returned by xlog_tx_commit(), but that would look
suspicious, because this function can write half a transaction before it
hits ENOSPC, after which it truncates the file back. It's unclear what
happens if, for instance, a replication thread reads those transitive
data.

Anyway, preallocating disk space in big chunk is a worthwhile feature
by itself, because it should reduce the number of writes to the inode
table.

Needed for #3397
---
 CMakeLists.txt            |  1 +
 src/box/journal.c         |  1 +
 src/box/journal.h         |  4 +++
 src/box/txn.c             |  1 +
 src/box/wal.c             | 27 ++++++++++++++
 src/box/xlog.c            | 91 ++++++++++++++++++++++++++++++++++++++++++-----
 src/box/xlog.h            | 20 +++++++++++
 src/box/xrow.h            | 13 +++++++
 src/trivia/config.h.cmake |  1 +
 9 files changed, 151 insertions(+), 8 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index bf68d187..3d12f3ff 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -80,6 +80,7 @@ check_symbol_exists(fdatasync unistd.h HAVE_FDATASYNC)
 check_symbol_exists(pthread_yield pthread.h HAVE_PTHREAD_YIELD)
 check_symbol_exists(sched_yield sched.h HAVE_SCHED_YIELD)
 check_symbol_exists(posix_fadvise fcntl.h HAVE_POSIX_FADVISE)
+check_symbol_exists(posix_fallocate fcntl.h HAVE_POSIX_FALLOCATE)
 check_symbol_exists(mremap sys/mman.h HAVE_MREMAP)
 
 check_function_exists(sync_file_range HAVE_SYNC_FILE_RANGE)
diff --git a/src/box/journal.c b/src/box/journal.c
index fd4f9539..99a521c3 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -66,6 +66,7 @@ journal_entry_new(size_t n_rows)
 		diag_set(OutOfMemory, size, "region", "struct journal_entry");
 		return NULL;
 	}
+	entry->len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
 	entry->fiber = fiber();
diff --git a/src/box/journal.h b/src/box/journal.h
index 1d64a7bd..fc495547 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -59,6 +59,10 @@ struct journal_entry {
 	 */
 	struct fiber *fiber;
 	/**
+	 * Max size the rows are going to take when encoded.
+	 */
+	size_t len;
+	/**
 	 * The number of rows in the request.
 	 */
 	int n_rows;
diff --git a/src/box/txn.c b/src/box/txn.c
index 17d97d76..9b465561 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -272,6 +272,7 @@ txn_write_to_wal(struct txn *txn)
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
 		*row++ = stmt->row;
+		req->len += xrow_len_max(stmt->row);
 	}
 	assert(row == req->rows + req->n_rows);
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 2a1353b0..91c16fb0 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -126,6 +126,11 @@ struct wal_writer
 
 struct wal_msg {
 	struct cmsg base;
+	/**
+	 * Max size the committed requests are going to take when
+	 * written to disk.
+	 */
+	size_t len;
 	/** Input queue, on output contains all committed requests. */
 	struct stailq commit;
 	/**
@@ -168,6 +173,7 @@ static void
 wal_msg_create(struct wal_msg *batch)
 {
 	cmsg_init(&batch->base, wal_request_route);
+	batch->len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
 }
@@ -603,6 +609,20 @@ wal_opt_rotate(struct wal_writer *writer)
 	return 0;
 }
 
+/**
+ * Make sure there's enough disk space to write @len bytes
+ * of data to the current WAL.
+ */
+static int
+wal_fallocate(struct wal_writer *writer, size_t len)
+{
+	if (xlog_fallocate(&writer->current_wal, len) < 0) {
+		diag_log();
+		return -1;
+	}
+	return 0;
+}
+
 static void
 wal_writer_clear_bus(struct cmsg *msg)
 {
@@ -689,6 +709,12 @@ wal_write_to_disk(struct cmsg *msg)
 		return wal_writer_begin_rollback(writer);
 	}
 
+	/* Ensure there's enough disk space before writing anything. */
+	if (wal_fallocate(writer, wal_msg->len) != 0) {
+		stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+		return wal_writer_begin_rollback(writer);
+	}
+
 	/*
 	 * This code tries to write queued requests (=transactions) using as
 	 * few I/O syscalls and memory copies as possible. For this reason
@@ -858,6 +884,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		stailq_add_tail_entry(&batch->commit, entry, fifo);
 		cpipe_push(&wal_thread.wal_pipe, &batch->base);
 	}
+	batch->len += entry->len;
 	wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
 	cpipe_flush_input(&wal_thread.wal_pipe);
 	/**
diff --git a/src/box/xlog.c b/src/box/xlog.c
index de5e52f7..292c462c 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -76,6 +76,22 @@ enum {
 	 * Maybe this should be a configuration option.
 	 */
 	XLOG_TX_COMPRESS_THRESHOLD = 2 * 1024,
+	/**
+	 * Minimal number of bytes of disk space to allocate
+	 * with xlog_fallocate(). Obviously, we want to invoke
+	 * fallocate() as rare as possible to avoid overhead
+	 * associated with a system call, however at the same
+	 * time we do not want to call it to allocate too big
+	 * chunks, because this may increase tx latency.
+	 */
+	XLOG_FALLOCATE_MIN = 128 * 1024,
+	/**
+	 * Allocate at least XLOG_FALLOCATE_FACTOR * size bytes
+	 * when xlog_fallocate(size) is called so that we do
+	 * not incur the overhead of an extra syscall per each
+	 * committed transaction.
+	 */
+	XLOG_FALLOCATE_FACTOR = 8,
 };
 
 /* {{{ struct xlog_meta */
@@ -988,6 +1004,48 @@ xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
 	return 0;
 }
 
+/*
+ * Simplify recovery after a temporary write failure:
+ * truncate the file to the best known good write position.
+ */
+static void
+xlog_write_error(struct xlog *log)
+{
+	if (lseek(log->fd, log->offset, SEEK_SET) < 0 ||
+	    ftruncate(log->fd, log->offset) != 0)
+		panic_syserror("failed to truncate xlog after write error");
+	log->alloc_len = 0;
+}
+
+ssize_t
+xlog_fallocate(struct xlog *log, size_t len)
+{
+#ifdef HAVE_POSIX_FALLOCATE
+	if (log->alloc_len > len)
+		return log->alloc_len;
+
+	len = len * XLOG_FALLOCATE_FACTOR - log->alloc_len;
+	len = MAX(len, XLOG_FALLOCATE_MIN);
+	off_t offset = log->offset + log->alloc_len;
+
+	int rc = posix_fallocate(log->fd, offset, len);
+	if (rc != 0) {
+		xlog_write_error(log);
+		errno = rc;
+		diag_set(SystemError, "%s: can't allocate disk space",
+			 log->filename);
+		return -1;
+	}
+
+	log->alloc_len += len;
+	return log->alloc_len;
+#else
+	(void)log;
+	(void)len;
+	return 0;
+#endif /* HAVE_POSIX_FALLOCATE */
+}
+
 /**
  * Write a sequence of uncompressed xrow objects.
  *
@@ -1168,17 +1226,14 @@ xlog_tx_write(struct xlog *log)
 	});
 
 	obuf_reset(&log->obuf);
-	/*
-	 * Simplify recovery after a temporary write failure:
-	 * truncate the file to the best known good write
-	 * position.
-	 */
 	if (written < 0) {
-		if (lseek(log->fd, log->offset, SEEK_SET) < 0 ||
-		    ftruncate(log->fd, log->offset) != 0)
-			panic_syserror("failed to truncate xlog after write error");
+		xlog_write_error(log);
 		return -1;
 	}
+	if (log->alloc_len > (size_t)written)
+		log->alloc_len -= written;
+	else
+		log->alloc_len = 0;
 	log->offset += written;
 	log->rows += log->tx_rows;
 	log->tx_rows = 0;
@@ -1376,6 +1431,17 @@ xlog_write_eof(struct xlog *l)
 		diag_set(ClientError, ER_INJECTION, "xlog write injection");
 		return -1;
 	});
+
+	/*
+	 * Free disk space preallocated with xlog_fallocate().
+	 * Don't write the eof marker if this fails, otherwise
+	 * we'll get "data after eof marker" error on recovery.
+	 */
+	if (l->alloc_len > 0 && ftruncate(l->fd, l->offset) < 0) {
+		diag_set(SystemError, "ftruncate() failed");
+		return -1;
+	}
+
 	if (fio_writen(l->fd, &eof_marker, sizeof(eof_marker)) < 0) {
 		diag_set(SystemError, "write() failed");
 		return -1;
@@ -1791,6 +1857,15 @@ xlog_cursor_next_tx(struct xlog_cursor *i)
 		return -1;
 	if (rc > 0)
 		return 1;
+	if (load_u32(i->rbuf.rpos) == 0) {
+		/*
+		 * Space preallocated with xlog_fallocate().
+		 * Treat as eof and clear the buffer.
+		 */
+		i->read_offset -= ibuf_used(&i->rbuf);
+		ibuf_reset(&i->rbuf);
+		return 1;
+	}
 	if (load_u32(i->rbuf.rpos) == eof_marker) {
 		/* eof marker found */
 		goto eof_found;
diff --git a/src/box/xlog.h b/src/box/xlog.h
index c2ac4774..f9243935 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -303,6 +303,11 @@ struct xlog {
 	/** The current offset in the log file, for writing. */
 	off_t offset;
 	/**
+	 * Size of disk space preallocated at @offset with
+	 * xlog_fallocate().
+	 */
+	size_t alloc_len;
+	/**
 	 * Output buffer, works as row accumulator for
 	 * compression.
 	 */
@@ -423,6 +428,21 @@ int
 xlog_rename(struct xlog *l);
 
 /**
+ * Try to allocate at least @size bytes of disk space at the end
+ * of the given xlog file. This function can be used in order to
+ * ensure that the following write of @size bytes will not fail
+ * with ENOSPC.
+ *
+ * On success, this function returns the number of bytes available
+ * for writing. If fallocate is not supported by the underlying OS,
+ * it returns 0.
+ *
+ * On error, it returns -1 and sets diag and errno.
+ */
+ssize_t
+xlog_fallocate(struct xlog *log, size_t size);
+
+/**
  * Write a row to xlog, 
  *
  * @retval count of writen bytes
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3fc007a8..b73b1f2f 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -68,6 +68,19 @@ struct xrow_header {
 };
 
 /**
+ * Return the max size which the given row is going to take when
+ * encoded into a binary packet.
+ */
+static inline size_t
+xrow_len_max(struct xrow_header *row)
+{
+	size_t len = XROW_HEADER_LEN_MAX;
+	for (int i = 0; i < row->bodycnt; i++)
+		len += row->body[i].iov_len;
+	return len;
+}
+
+/**
  * Encode xrow into a binary packet
  *
  * @param header xrow
diff --git a/src/trivia/config.h.cmake b/src/trivia/config.h.cmake
index 66ddba99..64ae8b61 100644
--- a/src/trivia/config.h.cmake
+++ b/src/trivia/config.h.cmake
@@ -166,6 +166,7 @@
 #cmakedefine HAVE_PTHREAD_YIELD 1
 #cmakedefine HAVE_SCHED_YIELD 1
 #cmakedefine HAVE_POSIX_FADVISE 1
+#cmakedefine HAVE_POSIX_FALLOCATE 1
 #cmakedefine HAVE_MREMAP 1
 
 #cmakedefine HAVE_PRCTL_H 1
-- 
2.11.0




More information about the Tarantool-patches mailing list