* [PATCH v3 1/5] wal: preallocate disk space before writing rows
  2018-10-24 13:43 [PATCH v3 0/5] Delete old WAL files if running out of disk space Vladimir Davydov
@ 2018-10-24 13:43 ` Vladimir Davydov
  2018-10-24 17:08   ` Konstantin Osipov
  2018-10-24 13:43 ` [PATCH v3 2/5] wal: pass wal_watcher_msg to wal_watcher callback Vladimir Davydov
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 10+ messages in thread
From: Vladimir Davydov @ 2018-10-24 13:43 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches
This function introduces a new xlog method xlog_fallocate() that makes
sure that the requested amount of disk space is available at the current
write position. It does that with posix_fallocate(). The new method is
called before writing anything to WAL, see wal_fallocate(). In order not
to invoke the system call too often, wal_fallocate() allocates disk
space in big chunks (1 MB).
The reason why I'm doing this is that I want to have a single and
clearly defined point in the code to handle ENOSPC errors, where
I could delete old WALs and retry (this is what #3397 is about).
Needed for #3397
---
 CMakeLists.txt            |  1 +
 src/box/journal.c         |  1 +
 src/box/journal.h         |  4 ++++
 src/box/txn.c             |  1 +
 src/box/wal.c             | 46 ++++++++++++++++++++++++++++++++++++++++++++++
 src/box/xlog.c            | 45 +++++++++++++++++++++++++++++++++++++++++++++
 src/box/xlog.h            | 16 ++++++++++++++++
 src/box/xrow.h            | 13 +++++++++++++
 src/trivia/config.h.cmake |  1 +
 9 files changed, 128 insertions(+)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 439a2750..c61d5569 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -82,6 +82,7 @@ check_symbol_exists(fdatasync unistd.h HAVE_FDATASYNC)
 check_symbol_exists(pthread_yield pthread.h HAVE_PTHREAD_YIELD)
 check_symbol_exists(sched_yield sched.h HAVE_SCHED_YIELD)
 check_symbol_exists(posix_fadvise fcntl.h HAVE_POSIX_FADVISE)
+check_symbol_exists(posix_fallocate fcntl.h HAVE_POSIX_FALLOCATE)
 check_symbol_exists(mremap sys/mman.h HAVE_MREMAP)
 
 check_function_exists(sync_file_range HAVE_SYNC_FILE_RANGE)
diff --git a/src/box/journal.c b/src/box/journal.c
index fd4f9539..7498ba19 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -66,6 +66,7 @@ journal_entry_new(size_t n_rows)
 		diag_set(OutOfMemory, size, "region", "struct journal_entry");
 		return NULL;
 	}
+	entry->approx_len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
 	entry->fiber = fiber();
diff --git a/src/box/journal.h b/src/box/journal.h
index 1d64a7bd..e5231688 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -59,6 +59,10 @@ struct journal_entry {
 	 */
 	struct fiber *fiber;
 	/**
+	 * Approximate size of this request when encoded.
+	 */
+	size_t approx_len;
+	/**
 	 * The number of rows in the request.
 	 */
 	int n_rows;
diff --git a/src/box/txn.c b/src/box/txn.c
index 17d97d76..1c75ed6b 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -272,6 +272,7 @@ txn_write_to_wal(struct txn *txn)
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
 		*row++ = stmt->row;
+		req->approx_len += xrow_approx_len(stmt->row);
 	}
 	assert(row == req->rows + req->n_rows);
 
diff --git a/src/box/wal.c b/src/box/wal.c
index f87b40ae..b7acb50e 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -44,6 +44,17 @@
 #include "coio_task.h"
 #include "replication.h"
 
+enum {
+	/**
+	 * Size of disk space to preallocate with xlog_fallocate().
+	 * Obviously, we want to call this function as infrequent as
+	 * possible to avoid the overhead associated with a system
+	 * call, however at the same time we do not want to call it
+	 * to allocate too big chunks, because this may increase tx
+	 * latency. 1 MB seems to be a well balanced choice.
+	 */
+	WAL_FALLOCATE_LEN = 1024 * 1024,
+};
 
 const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
@@ -126,6 +137,8 @@ struct wal_writer
 
 struct wal_msg {
 	struct cmsg base;
+	/** Approximate size of this request when encoded. */
+	size_t approx_len;
 	/** Input queue, on output contains all committed requests. */
 	struct stailq commit;
 	/**
@@ -168,6 +181,7 @@ static void
 wal_msg_create(struct wal_msg *batch)
 {
 	cmsg_init(&batch->base, wal_request_route);
+	batch->approx_len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
 }
@@ -603,6 +617,31 @@ wal_opt_rotate(struct wal_writer *writer)
 	return 0;
 }
 
+/**
+ * Make sure there's enough disk space to append @len bytes
+ * of data to the current WAL.
+ */
+static int
+wal_fallocate(struct wal_writer *writer, size_t len)
+{
+	struct xlog *l = &writer->current_wal;
+
+	/*
+	 * The actual write size can be greater than the sum size
+	 * of encoded rows (compression, fixheaders). Double the
+	 * given length to get a rough upper bound estimate.
+	 */
+	len *= 2;
+
+	if (l->allocated >= len)
+		return 0;
+	if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
+		return 0;
+
+	diag_log();
+	return -1;
+}
+
 static void
 wal_writer_clear_bus(struct cmsg *msg)
 {
@@ -689,6 +728,12 @@ wal_write_to_disk(struct cmsg *msg)
 		return wal_writer_begin_rollback(writer);
 	}
 
+	/* Ensure there's enough disk space before writing anything. */
+	if (wal_fallocate(writer, wal_msg->approx_len) != 0) {
+		stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+		return wal_writer_begin_rollback(writer);
+	}
+
 	/*
 	 * This code tries to write queued requests (=transactions) using as
 	 * few I/O syscalls and memory copies as possible. For this reason
@@ -858,6 +903,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		stailq_add_tail_entry(&batch->commit, entry, fifo);
 		cpipe_push(&wal_thread.wal_pipe, &batch->base);
 	}
+	batch->approx_len += entry->approx_len;
 	wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
 	cpipe_flush_input(&wal_thread.wal_pipe);
 	/**
diff --git a/src/box/xlog.c b/src/box/xlog.c
index cd2d6e1d..0f2f97b7 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -990,6 +990,26 @@ xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
 	return 0;
 }
 
+ssize_t
+xlog_fallocate(struct xlog *log, size_t len)
+{
+#ifdef HAVE_POSIX_FALLOCATE
+	int rc = posix_fallocate(log->fd, log->offset + log->allocated, len);
+	if (rc != 0) {
+		errno = rc;
+		diag_set(SystemError, "%s: can't allocate disk space",
+			 log->filename);
+		return -1;
+	}
+	log->allocated += len;
+	return 0;
+#else
+	(void)log;
+	(void)len;
+	return 0;
+#endif /* HAVE_POSIX_FALLOCATE */
+}
+
 /**
  * Write a sequence of uncompressed xrow objects.
  *
@@ -1179,8 +1199,13 @@ xlog_tx_write(struct xlog *log)
 		if (lseek(log->fd, log->offset, SEEK_SET) < 0 ||
 		    ftruncate(log->fd, log->offset) != 0)
 			panic_syserror("failed to truncate xlog after write error");
+		log->allocated = 0;
 		return -1;
 	}
+	if (log->allocated > (size_t)written)
+		log->allocated -= written;
+	else
+		log->allocated = 0;
 	log->offset += written;
 	log->rows += log->tx_rows;
 	log->tx_rows = 0;
@@ -1378,6 +1403,17 @@ xlog_write_eof(struct xlog *l)
 		diag_set(ClientError, ER_INJECTION, "xlog write injection");
 		return -1;
 	});
+
+	/*
+	 * Free disk space preallocated with xlog_fallocate().
+	 * Don't write the eof marker if this fails, otherwise
+	 * we'll get "data after eof marker" error on recovery.
+	 */
+	if (l->allocated > 0 && ftruncate(l->fd, l->offset) < 0) {
+		diag_set(SystemError, "ftruncate() failed");
+		return -1;
+	}
+
 	if (fio_writen(l->fd, &eof_marker, sizeof(eof_marker)) < 0) {
 		diag_set(SystemError, "write() failed");
 		return -1;
@@ -1793,6 +1829,15 @@ xlog_cursor_next_tx(struct xlog_cursor *i)
 		return -1;
 	if (rc > 0)
 		return 1;
+	if (load_u32(i->rbuf.rpos) == 0) {
+		/*
+		 * Space preallocated with xlog_fallocate().
+		 * Treat as eof and clear the buffer.
+		 */
+		i->read_offset -= ibuf_used(&i->rbuf);
+		ibuf_reset(&i->rbuf);
+		return 1;
+	}
 	if (load_u32(i->rbuf.rpos) == eof_marker) {
 		/* eof marker found */
 		goto eof_found;
diff --git a/src/box/xlog.h b/src/box/xlog.h
index e3c38486..e2fdfd74 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -314,6 +314,11 @@ struct xlog {
 	/** The current offset in the log file, for writing. */
 	off_t offset;
 	/**
+	 * Size of disk space preallocated at @offset with
+	 * xlog_fallocate().
+	 */
+	size_t allocated;
+	/**
 	 * Output buffer, works as row accumulator for
 	 * compression.
 	 */
@@ -434,6 +439,17 @@ int
 xlog_rename(struct xlog *l);
 
 /**
+ * Allocate @size bytes of disk space at the end of the given
+ * xlog file.
+ *
+ * Returns -1 on fallocate error and sets both diag and errno
+ * accordingly. On success returns 0. If the underlying OS
+ * does not support fallocate, this function also returns 0.
+ */
+ssize_t
+xlog_fallocate(struct xlog *log, size_t size);
+
+/**
  * Write a row to xlog, 
  *
  * @retval count of writen bytes
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3fc007a8..32d4d54b 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -68,6 +68,19 @@ struct xrow_header {
 };
 
 /**
+ * Return the max size which the given row is going to take when
+ * encoded into a binary packet.
+ */
+static inline size_t
+xrow_approx_len(struct xrow_header *row)
+{
+	size_t len = XROW_HEADER_LEN_MAX;
+	for (int i = 0; i < row->bodycnt; i++)
+		len += row->body[i].iov_len;
+	return len;
+}
+
+/**
  * Encode xrow into a binary packet
  *
  * @param header xrow
diff --git a/src/trivia/config.h.cmake b/src/trivia/config.h.cmake
index 8894b436..53eae2fe 100644
--- a/src/trivia/config.h.cmake
+++ b/src/trivia/config.h.cmake
@@ -166,6 +166,7 @@
 #cmakedefine HAVE_PTHREAD_YIELD 1
 #cmakedefine HAVE_SCHED_YIELD 1
 #cmakedefine HAVE_POSIX_FADVISE 1
+#cmakedefine HAVE_POSIX_FALLOCATE 1
 #cmakedefine HAVE_MREMAP 1
 #cmakedefine HAVE_SYNC_FILE_RANGE 1
 
-- 
2.11.0
^ permalink raw reply	[flat|nested] 10+ messages in thread* [PATCH v3 5/5] wal: delete old wal files when running out of disk space
  2018-10-24 13:43 [PATCH v3 0/5] Delete old WAL files if running out of disk space Vladimir Davydov
                   ` (3 preceding siblings ...)
  2018-10-24 13:43 ` [PATCH v3 4/5] wal: add event_mask to wal_watcher Vladimir Davydov
@ 2018-10-24 13:43 ` Vladimir Davydov
  2018-10-25 12:55 ` [PATCH v3 0/5] Delete old WAL files if " Vladimir Davydov
  5 siblings, 0 replies; 10+ messages in thread
From: Vladimir Davydov @ 2018-10-24 13:43 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches
Now if the WAL thread fails to preallocate disk space needed to commit
a transaction, it will delete old WAL files until it succeeds or it
deletes all files that are not needed for local recovery from the oldest
checkpoint. After it deletes a file, it notifies the garbage collector
via the WAL watcher interface. The latter then deactivates consumers
that would need deleted files.
The user doesn't see a ENOSPC error if the WAL thread successfully
allocates disk space after deleting old files. Here's what's printed
to the log when this happens:
  wal/101/main C> ran out of disk space, try to delete old WAL files
  wal/101/main I> removed /home/vlad/src/tarantool/test/var/001_replication/master/00000000000000000005.xlog
  wal/101/main I> removed /home/vlad/src/tarantool/test/var/001_replication/master/00000000000000000006.xlog
  wal/101/main I> removed /home/vlad/src/tarantool/test/var/001_replication/master/00000000000000000007.xlog
  main/105/main C> deactivated WAL consumer replica 82d0fa3f-6881-4bc5-a2c0-a0f5dcf80120 at {1: 5}
  main/105/main C> deactivated WAL consumer replica 98dce0a8-1213-4824-b31e-c7e3c4eaf437 at {1: 7}
Closes #3397
---
 src/box/box.cc                        |   9 +-
 src/box/gc.c                          |  66 +++++++++-
 src/box/gc.h                          |  31 +++++
 src/box/wal.c                         |  96 ++++++++++----
 src/box/wal.h                         |  24 +++-
 src/box/xlog.c                        |   3 +
 src/box/xlog.h                        |  33 +++++
 src/errinj.h                          |   1 +
 test/box/errinj.result                |  18 +--
 test/replication/gc_no_space.result   | 234 ++++++++++++++++++++++++++++++++++
 test/replication/gc_no_space.test.lua | 103 +++++++++++++++
 test/replication/suite.ini            |   2 +-
 12 files changed, 575 insertions(+), 45 deletions(-)
 create mode 100644 test/replication/gc_no_space.result
 create mode 100644 test/replication/gc_no_space.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index 79a818ec..c1d30aca 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2092,14 +2092,19 @@ box_cfg_xc(void)
 		}
 	}
 
+	struct gc_checkpoint *first_checkpoint = gc_first_checkpoint();
+	assert(first_checkpoint != NULL);
+
 	/* Start WAL writer */
 	int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
-	if (wal_init(wal_mode, cfg_gets("wal_dir"), &INSTANCE_UUID,
-		      &replicaset.vclock, wal_max_rows, wal_max_size)) {
+	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
+		     wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
+		     vclock_sum(&first_checkpoint->vclock))) {
 		diag_raise();
 	}
+	gc_set_wal_watcher();
 
 	rmean_cleanup(rmean_box);
 
diff --git a/src/box/gc.c b/src/box/gc.c
index becb5d09..467eecb9 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -48,6 +48,7 @@
 #include "say.h"
 #include "latch.h"
 #include "vclock.h"
+#include "cbus.h"
 #include "engine.h"		/* engine_collect_garbage() */
 #include "wal.h"		/* wal_collect_garbage() */
 
@@ -102,9 +103,24 @@ gc_init(void)
 	latch_create(&gc.latch);
 }
 
+static void
+gc_process_wal_event(struct wal_watcher_msg *);
+
+void
+gc_set_wal_watcher(void)
+{
+	wal_set_watcher(&gc.wal_watcher, "tx", gc_process_wal_event,
+			cbus_process, WAL_EVENT_GC);
+}
+
 void
 gc_free(void)
 {
+	/*
+	 * Can't clear the WAL watcher as the event loop isn't
+	 * running when this function is called.
+	 */
+
 	/* Free checkpoints. */
 	struct gc_checkpoint *checkpoint, *next_checkpoint;
 	rlist_foreach_entry_safe(checkpoint, &gc.checkpoints, in_checkpoints,
@@ -175,6 +191,9 @@ gc_run(void)
 	if (!run_engine_gc && !run_wal_gc)
 		return; /* nothing to do */
 
+	int64_t wal_lsn = vclock_sum(vclock);
+	int64_t checkpoint_lsn = vclock_sum(&checkpoint->vclock);
+
 	/*
 	 * Engine callbacks may sleep, because they use coio for
 	 * removing files. Make sure we won't try to remove the
@@ -191,12 +210,44 @@ gc_run(void)
 	 */
 	int rc = 0;
 	if (run_engine_gc)
-		rc = engine_collect_garbage(vclock_sum(&checkpoint->vclock));
-	if (run_wal_gc && rc == 0)
-		wal_collect_garbage(vclock_sum(vclock));
+		rc = engine_collect_garbage(checkpoint_lsn);
+	/*
+	 * Run wal_collect_garbage() even if we don't need to
+	 * delete any WAL files, because we have to apprise
+	 * the WAL thread of the oldest checkpoint signature.
+	 */
+	if (rc == 0)
+		wal_collect_garbage(wal_lsn, checkpoint_lsn);
 	latch_unlock(&gc.latch);
 }
 
+/**
+ * Deactivate consumers that need files deleted by the WAL thread.
+ */
+static void
+gc_process_wal_event(struct wal_watcher_msg *msg)
+{
+	assert((msg->events & WAL_EVENT_GC) != 0);
+
+	struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
+	while (consumer != NULL &&
+	       vclock_sum(&consumer->vclock) < vclock_sum(&msg->gc_vclock)) {
+		struct gc_consumer *next = gc_tree_next(&gc.consumers,
+							consumer);
+		assert(!consumer->is_inactive);
+		consumer->is_inactive = true;
+		gc_tree_remove(&gc.consumers, consumer);
+
+		char *vclock_str = vclock_to_string(&consumer->vclock);
+		say_crit("deactivated WAL consumer %s at %s",
+			 consumer->name, vclock_str);
+		free(vclock_str);
+
+		consumer = next;
+	}
+	gc_run();
+}
+
 void
 gc_set_min_checkpoint_count(int min_checkpoint_count)
 {
@@ -279,14 +330,19 @@ gc_consumer_register(const struct vclock *vclock, const char *format, ...)
 void
 gc_consumer_unregister(struct gc_consumer *consumer)
 {
-	gc_tree_remove(&gc.consumers, consumer);
+	if (!consumer->is_inactive) {
+		gc_tree_remove(&gc.consumers, consumer);
+		gc_run();
+	}
 	gc_consumer_delete(consumer);
-	gc_run();
 }
 
 void
 gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
 {
+	if (consumer->is_inactive)
+		return;
+
 	int64_t signature = vclock_sum(vclock);
 	int64_t prev_signature = vclock_sum(&consumer->vclock);
 
diff --git a/src/box/gc.h b/src/box/gc.h
index a5392cef..e1241baa 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -36,6 +36,7 @@
 
 #include "vclock.h"
 #include "latch.h"
+#include "wal.h"
 #include "trivia/util.h"
 
 #if defined(__cplusplus)
@@ -89,6 +90,11 @@ struct gc_consumer {
 	char name[GC_NAME_MAX];
 	/** The vclock tracked by this consumer. */
 	struct vclock vclock;
+	/**
+	 * This flag is set if a WAL needed by this consumer was
+	 * deleted by the WAL thread on ENOSPC.
+	 */
+	bool is_inactive;
 };
 
 typedef rb_tree(struct gc_consumer) gc_tree_t;
@@ -120,6 +126,11 @@ struct gc_state {
 	 * garbage collection callbacks.
 	 */
 	struct latch latch;
+	/**
+	 * WAL event watcher. Needed to shoot off stale consumers
+	 * when a WAL file is deleted due to ENOSPC.
+	 */
+	struct wal_watcher wal_watcher;
 };
 extern struct gc_state gc;
 
@@ -145,6 +156,20 @@ extern struct gc_state gc;
 	rlist_foreach_entry(ref, &(checkpoint)->refs, in_refs)
 
 /**
+ * Return the first (oldest) checkpoint known to the garbage
+ * collector. If there's no checkpoint, return NULL.
+ */
+static inline struct gc_checkpoint *
+gc_first_checkpoint(void)
+{
+	if (rlist_empty(&gc.checkpoints))
+		return NULL;
+
+	return rlist_first_entry(&gc.checkpoints, struct gc_checkpoint,
+				 in_checkpoints);
+}
+
+/**
  * Return the last (newest) checkpoint known to the garbage
  * collector. If there's no checkpoint, return NULL.
  */
@@ -165,6 +190,12 @@ void
 gc_init(void);
 
 /**
+ * Set WAL watcher. Called after WAL is initialized.
+ */
+void
+gc_set_wal_watcher(void);
+
+/**
  * Destroy the garbage collection state.
  */
 void
diff --git a/src/box/wal.c b/src/box/wal.c
index 09927f6f..a3be55f7 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -119,6 +119,12 @@ struct wal_writer
 	 * with this LSN and LSN becomes "real".
 	 */
 	struct vclock vclock;
+	/**
+	 * Signature of the oldest checkpoint available on the instance.
+	 * The WAL writer must not delete WAL files that are needed to
+	 * recover from it even if it is running out of disk space.
+	 */
+	int64_t checkpoint_lsn;
 	/** The current WAL file. */
 	struct xlog current_wal;
 	/**
@@ -287,9 +293,9 @@ tx_schedule_rollback(struct cmsg *msg)
  */
 static void
 wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
-		  const char *wal_dirname, const struct tt_uuid *instance_uuid,
-		  struct vclock *vclock, int64_t wal_max_rows,
-		  int64_t wal_max_size)
+		  const char *wal_dirname, int64_t wal_max_rows,
+		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+		  const struct vclock *vclock, int64_t checkpoint_lsn)
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_rows = wal_max_rows;
@@ -309,6 +315,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	vclock_create(&writer->vclock);
 	vclock_copy(&writer->vclock, vclock);
 
+	writer->checkpoint_lsn = checkpoint_lsn;
 	rlist_create(&writer->watchers);
 }
 
@@ -412,16 +419,16 @@ wal_open(struct wal_writer *writer)
  *        mode are closed. WAL thread has been started.
  */
 int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
-	 const struct tt_uuid *instance_uuid, struct vclock *vclock,
-	 int64_t wal_max_rows, int64_t wal_max_size)
+wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
+	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+	 const struct vclock *vclock, int64_t first_checkpoint_lsn)
 {
 	assert(wal_max_rows > 1);
 
 	struct wal_writer *writer = &wal_writer_singleton;
-
-	wal_writer_create(writer, wal_mode, wal_dirname, instance_uuid,
-			  vclock, wal_max_rows, wal_max_size);
+	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
+			  wal_max_size, instance_uuid, vclock,
+			  first_checkpoint_lsn);
 
 	/*
 	 * Scan the WAL directory to build an index of all
@@ -539,25 +546,29 @@ wal_checkpoint(struct vclock *vclock, bool rotate)
 struct wal_gc_msg
 {
 	struct cbus_call_msg base;
-	int64_t lsn;
+	int64_t wal_lsn;
+	int64_t checkpoint_lsn;
 };
 
 static int
 wal_collect_garbage_f(struct cbus_call_msg *data)
 {
-	int64_t lsn = ((struct wal_gc_msg *)data)->lsn;
-	xdir_collect_garbage(&wal_writer_singleton.wal_dir, lsn, 0);
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_gc_msg *msg = (struct wal_gc_msg *)data;
+	writer->checkpoint_lsn = msg->checkpoint_lsn;
+	xdir_collect_garbage(&writer->wal_dir, msg->wal_lsn, 0);
 	return 0;
 }
 
 void
-wal_collect_garbage(int64_t lsn)
+wal_collect_garbage(int64_t wal_lsn, int64_t checkpoint_lsn)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->wal_mode == WAL_NONE)
 		return;
 	struct wal_gc_msg msg;
-	msg.lsn = lsn;
+	msg.wal_lsn = wal_lsn;
+	msg.checkpoint_lsn = checkpoint_lsn;
 	bool cancellable = fiber_set_cancellable(false);
 	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg.base,
 		  wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
@@ -620,11 +631,16 @@ wal_opt_rotate(struct wal_writer *writer)
 /**
  * Make sure there's enough disk space to append @len bytes
  * of data to the current WAL.
+ *
+ * If fallocate() fails with ENOSPC, delete old WAL files
+ * that are not needed for recovery and retry.
  */
 static int
 wal_fallocate(struct wal_writer *writer, size_t len)
 {
+	bool warn_no_space = true;
 	struct xlog *l = &writer->current_wal;
+	struct errinj *errinj = errinj(ERRINJ_WAL_FALLOCATE, ERRINJ_INT);
 
 	/*
 	 * The actual write size can be greater than the sum size
@@ -633,11 +649,41 @@ wal_fallocate(struct wal_writer *writer, size_t len)
 	 */
 	len *= 2;
 
-	if (l->allocated >= len)
-		return 0;
-	if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
-		return 0;
+retry:
+	if (errinj == NULL || errinj->iparam == 0) {
+		if (l->allocated >= len)
+			return 0;
+		if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
+			return 0;
+	} else {
+		errinj->iparam--;
+		diag_set(ClientError, ER_INJECTION, "xlog fallocate");
+		errno = ENOSPC;
+	}
+	if (errno != ENOSPC)
+		goto error;
+	if (!xdir_has_garbage(&writer->wal_dir, writer->checkpoint_lsn))
+		goto error;
+
+	if (warn_no_space) {
+		say_crit("ran out of disk space, try to delete old WAL files");
+		warn_no_space = false;
+	}
+
+	/* Keep the original error. */
+	struct diag diag;
+	diag_create(&diag);
+	diag_move(diag_get(), &diag);
+	if (xdir_collect_garbage(&writer->wal_dir, writer->checkpoint_lsn,
+				 XDIR_GC_REMOVE_ONE) != 0) {
+		diag_move(&diag, diag_get());
+		goto error;
+	}
+	diag_destroy(&diag);
 
+	wal_notify_watchers(writer, WAL_EVENT_GC);
+	goto retry;
+error:
 	diag_log();
 	return -1;
 }
@@ -1027,13 +1073,16 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 {
 	assert(!rlist_empty(&watcher->next));
 
+	struct wal_watcher_msg *msg = &watcher->msg;
+	struct wal_writer *writer = &wal_writer_singleton;
+
 	events &= watcher->event_mask;
 	if (events == 0) {
 		/* The watcher isn't interested in this event. */
 		return;
 	}
 
-	if (watcher->msg.cmsg.route != NULL) {
+	if (msg->cmsg.route != NULL) {
 		/*
 		 * If the notification message is still en route,
 		 * mark the watcher to resend it as soon as it
@@ -1043,9 +1092,12 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 		return;
 	}
 
-	watcher->msg.events = events;
-	cmsg_init(&watcher->msg.cmsg, watcher->route);
-	cpipe_push(&watcher->watcher_pipe, &watcher->msg.cmsg);
+	msg->events = events;
+	if (xdir_first_vclock(&writer->wal_dir, &msg->gc_vclock) < 0)
+		vclock_copy(&msg->gc_vclock, &writer->vclock);
+
+	cmsg_init(&msg->cmsg, watcher->route);
+	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
 }
 
 static void
diff --git a/src/box/wal.h b/src/box/wal.h
index e8a4299c..d4a37c55 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -35,9 +35,9 @@
 #include "small/rlist.h"
 #include "cbus.h"
 #include "journal.h"
+#include "vclock.h"
 
 struct fiber;
-struct vclock;
 struct wal_writer;
 struct tt_uuid;
 
@@ -56,9 +56,9 @@ void
 wal_thread_start();
 
 int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
-	 const struct tt_uuid *instance_uuid, struct vclock *vclock,
-	 int64_t wal_max_rows, int64_t wal_max_size);
+wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
+	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+	 const struct vclock *vclock, int64_t first_checkpoint_lsn);
 
 void
 wal_thread_stop();
@@ -73,6 +73,8 @@ struct wal_watcher_msg {
 	struct wal_watcher *watcher;
 	/** Bit mask of events, see wal_event. */
 	unsigned events;
+	/** VClock of the oldest stored WAL row. */
+	struct vclock gc_vclock;
 };
 
 enum wal_event {
@@ -80,6 +82,11 @@ enum wal_event {
 	WAL_EVENT_WRITE		= (1 << 0),
 	/** A new WAL is created. */
 	WAL_EVENT_ROTATE	= (1 << 1),
+	/**
+	 * The WAL thread ran out of disk space and had to delete
+	 * one or more old WAL files.
+	 **/
+	WAL_EVENT_GC		= (1 << 2),
 };
 
 struct wal_watcher {
@@ -168,11 +175,14 @@ int
 wal_checkpoint(struct vclock *vclock, bool rotate);
 
 /**
- * Remove WAL files that are not needed to recover
- * from snapshot with @lsn or newer.
+ * Remove all WAL files whose signature is less than @wal_lsn.
+ * Update the oldest checkpoint signature with @checkpoint_lsn.
+ * WAL thread will delete WAL files that are not needed to
+ * recover from the oldest checkpoint if it runs out of disk
+ * space.
  */
 void
-wal_collect_garbage(int64_t lsn);
+wal_collect_garbage(int64_t wal_lsn, int64_t checkpoint_lsn);
 
 void
 wal_init_vy_log();
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 0f2f97b7..191fadcd 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -678,6 +678,9 @@ xdir_collect_garbage(struct xdir *dir, int64_t signature, unsigned flags)
 			say_info("removed %s", filename);
 		vclockset_remove(&dir->index, vclock);
 		free(vclock);
+
+		if (flags & XDIR_GC_REMOVE_ONE)
+			break;
 	}
 	return 0;
 }
diff --git a/src/box/xlog.h b/src/box/xlog.h
index e2fdfd74..7c69cc4f 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -179,6 +179,20 @@ xdir_format_filename(struct xdir *dir, int64_t signature,
 		     enum log_suffix suffix);
 
 /**
+ * Return true if the given directory index has files whose
+ * signature is less than specified.
+ *
+ * Supposed to be used to check if xdir_collect_garbage() can
+ * actually delete some files.
+ */
+static inline bool
+xdir_has_garbage(struct xdir *dir, int64_t signature)
+{
+	struct vclock *vclock = vclockset_first(&dir->index);
+	return vclock != NULL && vclock_sum(vclock) < signature;
+}
+
+/**
  * Flags passed to xdir_collect_garbage().
  */
 enum {
@@ -187,6 +201,10 @@ enum {
 	 * the caller thread.
 	 */
 	XDIR_GC_USE_COIO = 1 << 0,
+	/**
+	 * Return after removing a file.
+	 */
+	XDIR_GC_REMOVE_ONE = 1 << 1,
 };
 
 /**
@@ -203,6 +221,21 @@ void
 xdir_collect_inprogress(struct xdir *xdir);
 
 /**
+ * Return LSN and vclock (unless @vclock is NULL) of the oldest
+ * file in a directory or -1 if the directory is empty.
+ */
+static inline int64_t
+xdir_first_vclock(struct xdir *xdir, struct vclock *vclock)
+{
+	struct vclock *first = vclockset_first(&xdir->index);
+	if (first == NULL)
+		return -1;
+	if (vclock != NULL)
+		vclock_copy(vclock, first);
+	return vclock_sum(first);
+}
+
+/**
  * Return LSN and vclock (unless @vclock is NULL) of the newest
  * file in a directory or -1 if the directory is empty.
  */
diff --git a/src/errinj.h b/src/errinj.h
index 84a1fbb5..50062b62 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -79,6 +79,7 @@ struct errinj {
 	_(ERRINJ_WAL_WRITE_DISK, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_WAL_WRITE_EOF, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_WAL_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_WAL_FALLOCATE, ERRINJ_INT, {.iparam = 0}) \
 	_(ERRINJ_INDEX_ALLOC, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_TUPLE_ALLOC, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_TUPLE_FIELD, ERRINJ_BOOL, {.bparam = false}) \
diff --git a/test/box/errinj.result b/test/box/errinj.result
index c4a1326c..62dcc6a4 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -44,8 +44,8 @@ errinj.info()
     state: false
   ERRINJ_WAL_IO:
     state: false
-  ERRINJ_VY_INDEX_FILE_RENAME:
-    state: false
+  ERRINJ_WAL_FALLOCATE:
+    state: 0
   ERRINJ_TUPLE_ALLOC:
     state: false
   ERRINJ_VY_RUN_FILE_RENAME:
@@ -54,18 +54,20 @@ errinj.info()
     state: false
   ERRINJ_RELAY_REPORT_INTERVAL:
     state: 0
+  ERRINJ_SNAP_COMMIT_DELAY:
+    state: false
   ERRINJ_VY_READ_PAGE_TIMEOUT:
     state: 0
   ERRINJ_XLOG_META:
     state: false
-  ERRINJ_SNAP_COMMIT_DELAY:
-    state: false
   ERRINJ_WAL_BREAK_LSN:
     state: -1
-  ERRINJ_WAL_WRITE_DISK:
-    state: false
   ERRINJ_RELAY_BREAK_LSN:
     state: -1
+  ERRINJ_WAL_WRITE_DISK:
+    state: false
+  ERRINJ_VY_INDEX_FILE_RENAME:
+    state: false
   ERRINJ_VY_LOG_FILE_RENAME:
     state: false
   ERRINJ_VY_RUN_WRITE:
@@ -100,11 +102,11 @@ errinj.info()
     state: false
   ERRINJ_INDEX_ALLOC:
     state: false
-  ERRINJ_RELAY_TIMEOUT:
+  ERRINJ_VY_RUN_WRITE_TIMEOUT:
     state: 0
   ERRINJ_TESTING:
     state: false
-  ERRINJ_VY_RUN_WRITE_TIMEOUT:
+  ERRINJ_RELAY_TIMEOUT:
     state: 0
   ERRINJ_VY_SQUASH_TIMEOUT:
     state: 0
diff --git a/test/replication/gc_no_space.result b/test/replication/gc_no_space.result
new file mode 100644
index 00000000..8e663cdf
--- /dev/null
+++ b/test/replication/gc_no_space.result
@@ -0,0 +1,234 @@
+--
+-- This test checks that when the WAL thread runs out of disk
+-- space it automatically deletes old WAL files and notifies
+-- the TX thread so that the latter can shoot off WAL consumers
+-- that need them. See gh-3397.
+--
+test_run = require('test_run').new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+fio = require('fio')
+---
+...
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function check_file_count(dir, glob, count)
+    local files = fio.glob(fio.pathjoin(dir, glob))
+    if #files == count then
+        return true
+    end
+    return false, files
+end;
+---
+...
+function check_wal_count(count)
+    return check_file_count(box.cfg.wal_dir, '*.xlog', count)
+end;
+---
+...
+function check_snap_count(count)
+    return check_file_count(box.cfg.memtx_dir, '*.snap', count)
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+default_checkpoint_count = box.cfg.checkpoint_count
+---
+...
+box.cfg{checkpoint_count = 2}
+---
+...
+test_run:cleanup_cluster()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = s:create_index('pk')
+---
+...
+box.snapshot()
+---
+- ok
+...
+--
+-- Create a few dead replicas to pin WAL files.
+--
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+s:auto_increment{}
+---
+- [1]
+...
+box.snapshot()
+---
+- ok
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+s:auto_increment{}
+---
+- [2]
+...
+box.snapshot()
+---
+- ok
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+test_run:cmd("delete server replica")
+---
+- true
+...
+--
+-- Make a few checkpoints and check that old WAL files are not
+-- deleted.
+--
+s:auto_increment{}
+---
+- [3]
+...
+box.snapshot()
+---
+- ok
+...
+s:auto_increment{}
+---
+- [4]
+...
+box.snapshot()
+---
+- ok
+...
+s:auto_increment{}
+---
+- [5]
+...
+check_wal_count(7)
+---
+- true
+...
+check_snap_count(2)
+---
+- true
+...
+#box.info.gc().consumers -- 3
+---
+- 3
+...
+--
+-- Inject a ENOSPC error and check that the WAL thread deletes
+-- old WAL files to prevent the user from seeing the error.
+--
+errinj.set('ERRINJ_WAL_FALLOCATE', 3)
+---
+- ok
+...
+s:auto_increment{} -- success
+---
+- [6]
+...
+errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
+---
+- 0
+...
+check_wal_count(3)
+---
+- true
+...
+check_snap_count(2)
+---
+- true
+...
+#box.info.gc().consumers -- 1
+---
+- 1
+...
+--
+-- Check that the WAL thread never deletes WAL files that are
+-- needed for recovery from a checkpoint.
+--
+errinj.set('ERRINJ_WAL_FALLOCATE', 2)
+---
+- ok
+...
+s:auto_increment{} -- failure
+---
+- error: Failed to write to disk
+...
+errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
+---
+- 0
+...
+check_wal_count(2)
+---
+- true
+...
+check_snap_count(2)
+---
+- true
+...
+#box.info.gc().consumers -- 0
+---
+- 0
+...
+s:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+test_run:cleanup_cluster()
+---
+...
+box.cfg{checkpoint_count = default_checkpoint_count}
+---
+...
diff --git a/test/replication/gc_no_space.test.lua b/test/replication/gc_no_space.test.lua
new file mode 100644
index 00000000..4bab2b0e
--- /dev/null
+++ b/test/replication/gc_no_space.test.lua
@@ -0,0 +1,103 @@
+--
+-- This test checks that when the WAL thread runs out of disk
+-- space it automatically deletes old WAL files and notifies
+-- the TX thread so that the latter can shoot off WAL consumers
+-- that need them. See gh-3397.
+--
+test_run = require('test_run').new()
+engine = test_run:get_cfg('engine')
+
+fio = require('fio')
+errinj = box.error.injection
+
+test_run:cmd("setopt delimiter ';'")
+function check_file_count(dir, glob, count)
+    local files = fio.glob(fio.pathjoin(dir, glob))
+    if #files == count then
+        return true
+    end
+    return false, files
+end;
+function check_wal_count(count)
+    return check_file_count(box.cfg.wal_dir, '*.xlog', count)
+end;
+function check_snap_count(count)
+    return check_file_count(box.cfg.memtx_dir, '*.snap', count)
+end;
+test_run:cmd("setopt delimiter ''");
+
+default_checkpoint_count = box.cfg.checkpoint_count
+box.cfg{checkpoint_count = 2}
+
+test_run:cleanup_cluster()
+box.schema.user.grant('guest', 'replication')
+s = box.schema.space.create('test', {engine = engine})
+_ = s:create_index('pk')
+box.snapshot()
+
+--
+-- Create a few dead replicas to pin WAL files.
+--
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+s:auto_increment{}
+box.snapshot()
+
+test_run:cmd("start server replica")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+s:auto_increment{}
+box.snapshot()
+
+test_run:cmd("start server replica")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+test_run:cmd("delete server replica")
+
+--
+-- Make a few checkpoints and check that old WAL files are not
+-- deleted.
+--
+s:auto_increment{}
+box.snapshot()
+s:auto_increment{}
+box.snapshot()
+s:auto_increment{}
+
+check_wal_count(7)
+check_snap_count(2)
+#box.info.gc().consumers -- 3
+
+--
+-- Inject a ENOSPC error and check that the WAL thread deletes
+-- old WAL files to prevent the user from seeing the error.
+--
+errinj.set('ERRINJ_WAL_FALLOCATE', 3)
+s:auto_increment{} -- success
+errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
+
+check_wal_count(3)
+check_snap_count(2)
+#box.info.gc().consumers -- 1
+
+--
+-- Check that the WAL thread never deletes WAL files that are
+-- needed for recovery from a checkpoint.
+--
+errinj.set('ERRINJ_WAL_FALLOCATE', 2)
+s:auto_increment{} -- failure
+errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
+
+check_wal_count(2)
+check_snap_count(2)
+#box.info.gc().consumers -- 0
+
+s:drop()
+box.schema.user.revoke('guest', 'replication')
+test_run:cleanup_cluster()
+
+box.cfg{checkpoint_count = default_checkpoint_count}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index f4abc7af..569c9048 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
 script =  master.lua
 description = tarantool/box, replication
 disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua
 config = suite.cfg
 lua_libs = lua/fast_replica.lua lua/rlimit.lua
 long_run = prune.test.lua
-- 
2.11.0
^ permalink raw reply	[flat|nested] 10+ messages in thread