[PATCH v3 1/5] wal: preallocate disk space before writing rows

Vladimir Davydov vdavydov.dev at gmail.com
Wed Oct 24 16:43:13 MSK 2018


This function introduces a new xlog method xlog_fallocate() that makes
sure that the requested amount of disk space is available at the current
write position. It does that with posix_fallocate(). The new method is
called before writing anything to WAL, see wal_fallocate(). In order not
to invoke the system call too often, wal_fallocate() allocates disk
space in big chunks (1 MB).

The reason why I'm doing this is that I want to have a single and
clearly defined point in the code to handle ENOSPC errors, where
I could delete old WALs and retry (this is what #3397 is about).

Needed for #3397
---
 CMakeLists.txt            |  1 +
 src/box/journal.c         |  1 +
 src/box/journal.h         |  4 ++++
 src/box/txn.c             |  1 +
 src/box/wal.c             | 46 ++++++++++++++++++++++++++++++++++++++++++++++
 src/box/xlog.c            | 45 +++++++++++++++++++++++++++++++++++++++++++++
 src/box/xlog.h            | 16 ++++++++++++++++
 src/box/xrow.h            | 13 +++++++++++++
 src/trivia/config.h.cmake |  1 +
 9 files changed, 128 insertions(+)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 439a2750..c61d5569 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -82,6 +82,7 @@ check_symbol_exists(fdatasync unistd.h HAVE_FDATASYNC)
 check_symbol_exists(pthread_yield pthread.h HAVE_PTHREAD_YIELD)
 check_symbol_exists(sched_yield sched.h HAVE_SCHED_YIELD)
 check_symbol_exists(posix_fadvise fcntl.h HAVE_POSIX_FADVISE)
+check_symbol_exists(posix_fallocate fcntl.h HAVE_POSIX_FALLOCATE)
 check_symbol_exists(mremap sys/mman.h HAVE_MREMAP)
 
 check_function_exists(sync_file_range HAVE_SYNC_FILE_RANGE)
diff --git a/src/box/journal.c b/src/box/journal.c
index fd4f9539..7498ba19 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -66,6 +66,7 @@ journal_entry_new(size_t n_rows)
 		diag_set(OutOfMemory, size, "region", "struct journal_entry");
 		return NULL;
 	}
+	entry->approx_len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
 	entry->fiber = fiber();
diff --git a/src/box/journal.h b/src/box/journal.h
index 1d64a7bd..e5231688 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -59,6 +59,10 @@ struct journal_entry {
 	 */
 	struct fiber *fiber;
 	/**
+	 * Approximate size of this request when encoded.
+	 */
+	size_t approx_len;
+	/**
 	 * The number of rows in the request.
 	 */
 	int n_rows;
diff --git a/src/box/txn.c b/src/box/txn.c
index 17d97d76..1c75ed6b 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -272,6 +272,7 @@ txn_write_to_wal(struct txn *txn)
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
 		*row++ = stmt->row;
+		req->approx_len += xrow_approx_len(stmt->row);
 	}
 	assert(row == req->rows + req->n_rows);
 
diff --git a/src/box/wal.c b/src/box/wal.c
index f87b40ae..b7acb50e 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -44,6 +44,17 @@
 #include "coio_task.h"
 #include "replication.h"
 
+enum {
+	/**
+	 * Size of disk space to preallocate with xlog_fallocate().
+	 * Obviously, we want to call this function as infrequent as
+	 * possible to avoid the overhead associated with a system
+	 * call, however at the same time we do not want to call it
+	 * to allocate too big chunks, because this may increase tx
+	 * latency. 1 MB seems to be a well balanced choice.
+	 */
+	WAL_FALLOCATE_LEN = 1024 * 1024,
+};
 
 const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
@@ -126,6 +137,8 @@ struct wal_writer
 
 struct wal_msg {
 	struct cmsg base;
+	/** Approximate size of this request when encoded. */
+	size_t approx_len;
 	/** Input queue, on output contains all committed requests. */
 	struct stailq commit;
 	/**
@@ -168,6 +181,7 @@ static void
 wal_msg_create(struct wal_msg *batch)
 {
 	cmsg_init(&batch->base, wal_request_route);
+	batch->approx_len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
 }
@@ -603,6 +617,31 @@ wal_opt_rotate(struct wal_writer *writer)
 	return 0;
 }
 
+/**
+ * Make sure there's enough disk space to append @len bytes
+ * of data to the current WAL.
+ */
+static int
+wal_fallocate(struct wal_writer *writer, size_t len)
+{
+	struct xlog *l = &writer->current_wal;
+
+	/*
+	 * The actual write size can be greater than the sum size
+	 * of encoded rows (compression, fixheaders). Double the
+	 * given length to get a rough upper bound estimate.
+	 */
+	len *= 2;
+
+	if (l->allocated >= len)
+		return 0;
+	if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
+		return 0;
+
+	diag_log();
+	return -1;
+}
+
 static void
 wal_writer_clear_bus(struct cmsg *msg)
 {
@@ -689,6 +728,12 @@ wal_write_to_disk(struct cmsg *msg)
 		return wal_writer_begin_rollback(writer);
 	}
 
+	/* Ensure there's enough disk space before writing anything. */
+	if (wal_fallocate(writer, wal_msg->approx_len) != 0) {
+		stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+		return wal_writer_begin_rollback(writer);
+	}
+
 	/*
 	 * This code tries to write queued requests (=transactions) using as
 	 * few I/O syscalls and memory copies as possible. For this reason
@@ -858,6 +903,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		stailq_add_tail_entry(&batch->commit, entry, fifo);
 		cpipe_push(&wal_thread.wal_pipe, &batch->base);
 	}
+	batch->approx_len += entry->approx_len;
 	wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
 	cpipe_flush_input(&wal_thread.wal_pipe);
 	/**
diff --git a/src/box/xlog.c b/src/box/xlog.c
index cd2d6e1d..0f2f97b7 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -990,6 +990,26 @@ xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
 	return 0;
 }
 
+ssize_t
+xlog_fallocate(struct xlog *log, size_t len)
+{
+#ifdef HAVE_POSIX_FALLOCATE
+	int rc = posix_fallocate(log->fd, log->offset + log->allocated, len);
+	if (rc != 0) {
+		errno = rc;
+		diag_set(SystemError, "%s: can't allocate disk space",
+			 log->filename);
+		return -1;
+	}
+	log->allocated += len;
+	return 0;
+#else
+	(void)log;
+	(void)len;
+	return 0;
+#endif /* HAVE_POSIX_FALLOCATE */
+}
+
 /**
  * Write a sequence of uncompressed xrow objects.
  *
@@ -1179,8 +1199,13 @@ xlog_tx_write(struct xlog *log)
 		if (lseek(log->fd, log->offset, SEEK_SET) < 0 ||
 		    ftruncate(log->fd, log->offset) != 0)
 			panic_syserror("failed to truncate xlog after write error");
+		log->allocated = 0;
 		return -1;
 	}
+	if (log->allocated > (size_t)written)
+		log->allocated -= written;
+	else
+		log->allocated = 0;
 	log->offset += written;
 	log->rows += log->tx_rows;
 	log->tx_rows = 0;
@@ -1378,6 +1403,17 @@ xlog_write_eof(struct xlog *l)
 		diag_set(ClientError, ER_INJECTION, "xlog write injection");
 		return -1;
 	});
+
+	/*
+	 * Free disk space preallocated with xlog_fallocate().
+	 * Don't write the eof marker if this fails, otherwise
+	 * we'll get "data after eof marker" error on recovery.
+	 */
+	if (l->allocated > 0 && ftruncate(l->fd, l->offset) < 0) {
+		diag_set(SystemError, "ftruncate() failed");
+		return -1;
+	}
+
 	if (fio_writen(l->fd, &eof_marker, sizeof(eof_marker)) < 0) {
 		diag_set(SystemError, "write() failed");
 		return -1;
@@ -1793,6 +1829,15 @@ xlog_cursor_next_tx(struct xlog_cursor *i)
 		return -1;
 	if (rc > 0)
 		return 1;
+	if (load_u32(i->rbuf.rpos) == 0) {
+		/*
+		 * Space preallocated with xlog_fallocate().
+		 * Treat as eof and clear the buffer.
+		 */
+		i->read_offset -= ibuf_used(&i->rbuf);
+		ibuf_reset(&i->rbuf);
+		return 1;
+	}
 	if (load_u32(i->rbuf.rpos) == eof_marker) {
 		/* eof marker found */
 		goto eof_found;
diff --git a/src/box/xlog.h b/src/box/xlog.h
index e3c38486..e2fdfd74 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -314,6 +314,11 @@ struct xlog {
 	/** The current offset in the log file, for writing. */
 	off_t offset;
 	/**
+	 * Size of disk space preallocated at @offset with
+	 * xlog_fallocate().
+	 */
+	size_t allocated;
+	/**
 	 * Output buffer, works as row accumulator for
 	 * compression.
 	 */
@@ -434,6 +439,17 @@ int
 xlog_rename(struct xlog *l);
 
 /**
+ * Allocate @size bytes of disk space at the end of the given
+ * xlog file.
+ *
+ * Returns -1 on fallocate error and sets both diag and errno
+ * accordingly. On success returns 0. If the underlying OS
+ * does not support fallocate, this function also returns 0.
+ */
+ssize_t
+xlog_fallocate(struct xlog *log, size_t size);
+
+/**
  * Write a row to xlog, 
  *
  * @retval count of writen bytes
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3fc007a8..32d4d54b 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -68,6 +68,19 @@ struct xrow_header {
 };
 
 /**
+ * Return the max size which the given row is going to take when
+ * encoded into a binary packet.
+ */
+static inline size_t
+xrow_approx_len(struct xrow_header *row)
+{
+	size_t len = XROW_HEADER_LEN_MAX;
+	for (int i = 0; i < row->bodycnt; i++)
+		len += row->body[i].iov_len;
+	return len;
+}
+
+/**
  * Encode xrow into a binary packet
  *
  * @param header xrow
diff --git a/src/trivia/config.h.cmake b/src/trivia/config.h.cmake
index 8894b436..53eae2fe 100644
--- a/src/trivia/config.h.cmake
+++ b/src/trivia/config.h.cmake
@@ -166,6 +166,7 @@
 #cmakedefine HAVE_PTHREAD_YIELD 1
 #cmakedefine HAVE_SCHED_YIELD 1
 #cmakedefine HAVE_POSIX_FADVISE 1
+#cmakedefine HAVE_POSIX_FALLOCATE 1
 #cmakedefine HAVE_MREMAP 1
 #cmakedefine HAVE_SYNC_FILE_RANGE 1
 
-- 
2.11.0




More information about the Tarantool-patches mailing list