* [PATCH v3 1/5] wal: preallocate disk space before writing rows
2018-10-24 13:43 [PATCH v3 0/5] Delete old WAL files if running out of disk space Vladimir Davydov
@ 2018-10-24 13:43 ` Vladimir Davydov
2018-10-24 17:08 ` Konstantin Osipov
2018-10-24 13:43 ` [PATCH v3 2/5] wal: pass wal_watcher_msg to wal_watcher callback Vladimir Davydov
` (4 subsequent siblings)
5 siblings, 1 reply; 10+ messages in thread
From: Vladimir Davydov @ 2018-10-24 13:43 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
This function introduces a new xlog method xlog_fallocate() that makes
sure that the requested amount of disk space is available at the current
write position. It does that with posix_fallocate(). The new method is
called before writing anything to WAL, see wal_fallocate(). In order not
to invoke the system call too often, wal_fallocate() allocates disk
space in big chunks (1 MB).
The reason why I'm doing this is that I want to have a single and
clearly defined point in the code to handle ENOSPC errors, where
I could delete old WALs and retry (this is what #3397 is about).
Needed for #3397
---
CMakeLists.txt | 1 +
src/box/journal.c | 1 +
src/box/journal.h | 4 ++++
src/box/txn.c | 1 +
src/box/wal.c | 46 ++++++++++++++++++++++++++++++++++++++++++++++
src/box/xlog.c | 45 +++++++++++++++++++++++++++++++++++++++++++++
src/box/xlog.h | 16 ++++++++++++++++
src/box/xrow.h | 13 +++++++++++++
src/trivia/config.h.cmake | 1 +
9 files changed, 128 insertions(+)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 439a2750..c61d5569 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -82,6 +82,7 @@ check_symbol_exists(fdatasync unistd.h HAVE_FDATASYNC)
check_symbol_exists(pthread_yield pthread.h HAVE_PTHREAD_YIELD)
check_symbol_exists(sched_yield sched.h HAVE_SCHED_YIELD)
check_symbol_exists(posix_fadvise fcntl.h HAVE_POSIX_FADVISE)
+check_symbol_exists(posix_fallocate fcntl.h HAVE_POSIX_FALLOCATE)
check_symbol_exists(mremap sys/mman.h HAVE_MREMAP)
check_function_exists(sync_file_range HAVE_SYNC_FILE_RANGE)
diff --git a/src/box/journal.c b/src/box/journal.c
index fd4f9539..7498ba19 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -66,6 +66,7 @@ journal_entry_new(size_t n_rows)
diag_set(OutOfMemory, size, "region", "struct journal_entry");
return NULL;
}
+ entry->approx_len = 0;
entry->n_rows = n_rows;
entry->res = -1;
entry->fiber = fiber();
diff --git a/src/box/journal.h b/src/box/journal.h
index 1d64a7bd..e5231688 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -59,6 +59,10 @@ struct journal_entry {
*/
struct fiber *fiber;
/**
+ * Approximate size of this request when encoded.
+ */
+ size_t approx_len;
+ /**
* The number of rows in the request.
*/
int n_rows;
diff --git a/src/box/txn.c b/src/box/txn.c
index 17d97d76..1c75ed6b 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -272,6 +272,7 @@ txn_write_to_wal(struct txn *txn)
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
*row++ = stmt->row;
+ req->approx_len += xrow_approx_len(stmt->row);
}
assert(row == req->rows + req->n_rows);
diff --git a/src/box/wal.c b/src/box/wal.c
index f87b40ae..b7acb50e 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -44,6 +44,17 @@
#include "coio_task.h"
#include "replication.h"
+enum {
+ /**
+ * Size of disk space to preallocate with xlog_fallocate().
+ * Obviously, we want to call this function as infrequent as
+ * possible to avoid the overhead associated with a system
+ * call, however at the same time we do not want to call it
+ * to allocate too big chunks, because this may increase tx
+ * latency. 1 MB seems to be a well balanced choice.
+ */
+ WAL_FALLOCATE_LEN = 1024 * 1024,
+};
const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
@@ -126,6 +137,8 @@ struct wal_writer
struct wal_msg {
struct cmsg base;
+ /** Approximate size of this request when encoded. */
+ size_t approx_len;
/** Input queue, on output contains all committed requests. */
struct stailq commit;
/**
@@ -168,6 +181,7 @@ static void
wal_msg_create(struct wal_msg *batch)
{
cmsg_init(&batch->base, wal_request_route);
+ batch->approx_len = 0;
stailq_create(&batch->commit);
stailq_create(&batch->rollback);
}
@@ -603,6 +617,31 @@ wal_opt_rotate(struct wal_writer *writer)
return 0;
}
+/**
+ * Make sure there's enough disk space to append @len bytes
+ * of data to the current WAL.
+ */
+static int
+wal_fallocate(struct wal_writer *writer, size_t len)
+{
+ struct xlog *l = &writer->current_wal;
+
+ /*
+ * The actual write size can be greater than the sum size
+ * of encoded rows (compression, fixheaders). Double the
+ * given length to get a rough upper bound estimate.
+ */
+ len *= 2;
+
+ if (l->allocated >= len)
+ return 0;
+ if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
+ return 0;
+
+ diag_log();
+ return -1;
+}
+
static void
wal_writer_clear_bus(struct cmsg *msg)
{
@@ -689,6 +728,12 @@ wal_write_to_disk(struct cmsg *msg)
return wal_writer_begin_rollback(writer);
}
+ /* Ensure there's enough disk space before writing anything. */
+ if (wal_fallocate(writer, wal_msg->approx_len) != 0) {
+ stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+ return wal_writer_begin_rollback(writer);
+ }
+
/*
* This code tries to write queued requests (=transactions) using as
* few I/O syscalls and memory copies as possible. For this reason
@@ -858,6 +903,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
stailq_add_tail_entry(&batch->commit, entry, fifo);
cpipe_push(&wal_thread.wal_pipe, &batch->base);
}
+ batch->approx_len += entry->approx_len;
wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
cpipe_flush_input(&wal_thread.wal_pipe);
/**
diff --git a/src/box/xlog.c b/src/box/xlog.c
index cd2d6e1d..0f2f97b7 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -990,6 +990,26 @@ xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
return 0;
}
+ssize_t
+xlog_fallocate(struct xlog *log, size_t len)
+{
+#ifdef HAVE_POSIX_FALLOCATE
+ int rc = posix_fallocate(log->fd, log->offset + log->allocated, len);
+ if (rc != 0) {
+ errno = rc;
+ diag_set(SystemError, "%s: can't allocate disk space",
+ log->filename);
+ return -1;
+ }
+ log->allocated += len;
+ return 0;
+#else
+ (void)log;
+ (void)len;
+ return 0;
+#endif /* HAVE_POSIX_FALLOCATE */
+}
+
/**
* Write a sequence of uncompressed xrow objects.
*
@@ -1179,8 +1199,13 @@ xlog_tx_write(struct xlog *log)
if (lseek(log->fd, log->offset, SEEK_SET) < 0 ||
ftruncate(log->fd, log->offset) != 0)
panic_syserror("failed to truncate xlog after write error");
+ log->allocated = 0;
return -1;
}
+ if (log->allocated > (size_t)written)
+ log->allocated -= written;
+ else
+ log->allocated = 0;
log->offset += written;
log->rows += log->tx_rows;
log->tx_rows = 0;
@@ -1378,6 +1403,17 @@ xlog_write_eof(struct xlog *l)
diag_set(ClientError, ER_INJECTION, "xlog write injection");
return -1;
});
+
+ /*
+ * Free disk space preallocated with xlog_fallocate().
+ * Don't write the eof marker if this fails, otherwise
+ * we'll get "data after eof marker" error on recovery.
+ */
+ if (l->allocated > 0 && ftruncate(l->fd, l->offset) < 0) {
+ diag_set(SystemError, "ftruncate() failed");
+ return -1;
+ }
+
if (fio_writen(l->fd, &eof_marker, sizeof(eof_marker)) < 0) {
diag_set(SystemError, "write() failed");
return -1;
@@ -1793,6 +1829,15 @@ xlog_cursor_next_tx(struct xlog_cursor *i)
return -1;
if (rc > 0)
return 1;
+ if (load_u32(i->rbuf.rpos) == 0) {
+ /*
+ * Space preallocated with xlog_fallocate().
+ * Treat as eof and clear the buffer.
+ */
+ i->read_offset -= ibuf_used(&i->rbuf);
+ ibuf_reset(&i->rbuf);
+ return 1;
+ }
if (load_u32(i->rbuf.rpos) == eof_marker) {
/* eof marker found */
goto eof_found;
diff --git a/src/box/xlog.h b/src/box/xlog.h
index e3c38486..e2fdfd74 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -314,6 +314,11 @@ struct xlog {
/** The current offset in the log file, for writing. */
off_t offset;
/**
+ * Size of disk space preallocated at @offset with
+ * xlog_fallocate().
+ */
+ size_t allocated;
+ /**
* Output buffer, works as row accumulator for
* compression.
*/
@@ -434,6 +439,17 @@ int
xlog_rename(struct xlog *l);
/**
+ * Allocate @size bytes of disk space at the end of the given
+ * xlog file.
+ *
+ * Returns -1 on fallocate error and sets both diag and errno
+ * accordingly. On success returns 0. If the underlying OS
+ * does not support fallocate, this function also returns 0.
+ */
+ssize_t
+xlog_fallocate(struct xlog *log, size_t size);
+
+/**
* Write a row to xlog,
*
* @retval count of writen bytes
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3fc007a8..32d4d54b 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -68,6 +68,19 @@ struct xrow_header {
};
/**
+ * Return the max size which the given row is going to take when
+ * encoded into a binary packet.
+ */
+static inline size_t
+xrow_approx_len(struct xrow_header *row)
+{
+ size_t len = XROW_HEADER_LEN_MAX;
+ for (int i = 0; i < row->bodycnt; i++)
+ len += row->body[i].iov_len;
+ return len;
+}
+
+/**
* Encode xrow into a binary packet
*
* @param header xrow
diff --git a/src/trivia/config.h.cmake b/src/trivia/config.h.cmake
index 8894b436..53eae2fe 100644
--- a/src/trivia/config.h.cmake
+++ b/src/trivia/config.h.cmake
@@ -166,6 +166,7 @@
#cmakedefine HAVE_PTHREAD_YIELD 1
#cmakedefine HAVE_SCHED_YIELD 1
#cmakedefine HAVE_POSIX_FADVISE 1
+#cmakedefine HAVE_POSIX_FALLOCATE 1
#cmakedefine HAVE_MREMAP 1
#cmakedefine HAVE_SYNC_FILE_RANGE 1
--
2.11.0
^ permalink raw reply [flat|nested] 10+ messages in thread
* [PATCH v3 5/5] wal: delete old wal files when running out of disk space
2018-10-24 13:43 [PATCH v3 0/5] Delete old WAL files if running out of disk space Vladimir Davydov
` (3 preceding siblings ...)
2018-10-24 13:43 ` [PATCH v3 4/5] wal: add event_mask to wal_watcher Vladimir Davydov
@ 2018-10-24 13:43 ` Vladimir Davydov
2018-10-25 12:55 ` [PATCH v3 0/5] Delete old WAL files if " Vladimir Davydov
5 siblings, 0 replies; 10+ messages in thread
From: Vladimir Davydov @ 2018-10-24 13:43 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Now if the WAL thread fails to preallocate disk space needed to commit
a transaction, it will delete old WAL files until it succeeds or it
deletes all files that are not needed for local recovery from the oldest
checkpoint. After it deletes a file, it notifies the garbage collector
via the WAL watcher interface. The latter then deactivates consumers
that would need deleted files.
The user doesn't see a ENOSPC error if the WAL thread successfully
allocates disk space after deleting old files. Here's what's printed
to the log when this happens:
wal/101/main C> ran out of disk space, try to delete old WAL files
wal/101/main I> removed /home/vlad/src/tarantool/test/var/001_replication/master/00000000000000000005.xlog
wal/101/main I> removed /home/vlad/src/tarantool/test/var/001_replication/master/00000000000000000006.xlog
wal/101/main I> removed /home/vlad/src/tarantool/test/var/001_replication/master/00000000000000000007.xlog
main/105/main C> deactivated WAL consumer replica 82d0fa3f-6881-4bc5-a2c0-a0f5dcf80120 at {1: 5}
main/105/main C> deactivated WAL consumer replica 98dce0a8-1213-4824-b31e-c7e3c4eaf437 at {1: 7}
Closes #3397
---
src/box/box.cc | 9 +-
src/box/gc.c | 66 +++++++++-
src/box/gc.h | 31 +++++
src/box/wal.c | 96 ++++++++++----
src/box/wal.h | 24 +++-
src/box/xlog.c | 3 +
src/box/xlog.h | 33 +++++
src/errinj.h | 1 +
test/box/errinj.result | 18 +--
test/replication/gc_no_space.result | 234 ++++++++++++++++++++++++++++++++++
test/replication/gc_no_space.test.lua | 103 +++++++++++++++
test/replication/suite.ini | 2 +-
12 files changed, 575 insertions(+), 45 deletions(-)
create mode 100644 test/replication/gc_no_space.result
create mode 100644 test/replication/gc_no_space.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index 79a818ec..c1d30aca 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2092,14 +2092,19 @@ box_cfg_xc(void)
}
}
+ struct gc_checkpoint *first_checkpoint = gc_first_checkpoint();
+ assert(first_checkpoint != NULL);
+
/* Start WAL writer */
int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
- if (wal_init(wal_mode, cfg_gets("wal_dir"), &INSTANCE_UUID,
- &replicaset.vclock, wal_max_rows, wal_max_size)) {
+ if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
+ wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
+ vclock_sum(&first_checkpoint->vclock))) {
diag_raise();
}
+ gc_set_wal_watcher();
rmean_cleanup(rmean_box);
diff --git a/src/box/gc.c b/src/box/gc.c
index becb5d09..467eecb9 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -48,6 +48,7 @@
#include "say.h"
#include "latch.h"
#include "vclock.h"
+#include "cbus.h"
#include "engine.h" /* engine_collect_garbage() */
#include "wal.h" /* wal_collect_garbage() */
@@ -102,9 +103,24 @@ gc_init(void)
latch_create(&gc.latch);
}
+static void
+gc_process_wal_event(struct wal_watcher_msg *);
+
+void
+gc_set_wal_watcher(void)
+{
+ wal_set_watcher(&gc.wal_watcher, "tx", gc_process_wal_event,
+ cbus_process, WAL_EVENT_GC);
+}
+
void
gc_free(void)
{
+ /*
+ * Can't clear the WAL watcher as the event loop isn't
+ * running when this function is called.
+ */
+
/* Free checkpoints. */
struct gc_checkpoint *checkpoint, *next_checkpoint;
rlist_foreach_entry_safe(checkpoint, &gc.checkpoints, in_checkpoints,
@@ -175,6 +191,9 @@ gc_run(void)
if (!run_engine_gc && !run_wal_gc)
return; /* nothing to do */
+ int64_t wal_lsn = vclock_sum(vclock);
+ int64_t checkpoint_lsn = vclock_sum(&checkpoint->vclock);
+
/*
* Engine callbacks may sleep, because they use coio for
* removing files. Make sure we won't try to remove the
@@ -191,12 +210,44 @@ gc_run(void)
*/
int rc = 0;
if (run_engine_gc)
- rc = engine_collect_garbage(vclock_sum(&checkpoint->vclock));
- if (run_wal_gc && rc == 0)
- wal_collect_garbage(vclock_sum(vclock));
+ rc = engine_collect_garbage(checkpoint_lsn);
+ /*
+ * Run wal_collect_garbage() even if we don't need to
+ * delete any WAL files, because we have to apprise
+ * the WAL thread of the oldest checkpoint signature.
+ */
+ if (rc == 0)
+ wal_collect_garbage(wal_lsn, checkpoint_lsn);
latch_unlock(&gc.latch);
}
+/**
+ * Deactivate consumers that need files deleted by the WAL thread.
+ */
+static void
+gc_process_wal_event(struct wal_watcher_msg *msg)
+{
+ assert((msg->events & WAL_EVENT_GC) != 0);
+
+ struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
+ while (consumer != NULL &&
+ vclock_sum(&consumer->vclock) < vclock_sum(&msg->gc_vclock)) {
+ struct gc_consumer *next = gc_tree_next(&gc.consumers,
+ consumer);
+ assert(!consumer->is_inactive);
+ consumer->is_inactive = true;
+ gc_tree_remove(&gc.consumers, consumer);
+
+ char *vclock_str = vclock_to_string(&consumer->vclock);
+ say_crit("deactivated WAL consumer %s at %s",
+ consumer->name, vclock_str);
+ free(vclock_str);
+
+ consumer = next;
+ }
+ gc_run();
+}
+
void
gc_set_min_checkpoint_count(int min_checkpoint_count)
{
@@ -279,14 +330,19 @@ gc_consumer_register(const struct vclock *vclock, const char *format, ...)
void
gc_consumer_unregister(struct gc_consumer *consumer)
{
- gc_tree_remove(&gc.consumers, consumer);
+ if (!consumer->is_inactive) {
+ gc_tree_remove(&gc.consumers, consumer);
+ gc_run();
+ }
gc_consumer_delete(consumer);
- gc_run();
}
void
gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
{
+ if (consumer->is_inactive)
+ return;
+
int64_t signature = vclock_sum(vclock);
int64_t prev_signature = vclock_sum(&consumer->vclock);
diff --git a/src/box/gc.h b/src/box/gc.h
index a5392cef..e1241baa 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -36,6 +36,7 @@
#include "vclock.h"
#include "latch.h"
+#include "wal.h"
#include "trivia/util.h"
#if defined(__cplusplus)
@@ -89,6 +90,11 @@ struct gc_consumer {
char name[GC_NAME_MAX];
/** The vclock tracked by this consumer. */
struct vclock vclock;
+ /**
+ * This flag is set if a WAL needed by this consumer was
+ * deleted by the WAL thread on ENOSPC.
+ */
+ bool is_inactive;
};
typedef rb_tree(struct gc_consumer) gc_tree_t;
@@ -120,6 +126,11 @@ struct gc_state {
* garbage collection callbacks.
*/
struct latch latch;
+ /**
+ * WAL event watcher. Needed to shoot off stale consumers
+ * when a WAL file is deleted due to ENOSPC.
+ */
+ struct wal_watcher wal_watcher;
};
extern struct gc_state gc;
@@ -145,6 +156,20 @@ extern struct gc_state gc;
rlist_foreach_entry(ref, &(checkpoint)->refs, in_refs)
/**
+ * Return the first (oldest) checkpoint known to the garbage
+ * collector. If there's no checkpoint, return NULL.
+ */
+static inline struct gc_checkpoint *
+gc_first_checkpoint(void)
+{
+ if (rlist_empty(&gc.checkpoints))
+ return NULL;
+
+ return rlist_first_entry(&gc.checkpoints, struct gc_checkpoint,
+ in_checkpoints);
+}
+
+/**
* Return the last (newest) checkpoint known to the garbage
* collector. If there's no checkpoint, return NULL.
*/
@@ -165,6 +190,12 @@ void
gc_init(void);
/**
+ * Set WAL watcher. Called after WAL is initialized.
+ */
+void
+gc_set_wal_watcher(void);
+
+/**
* Destroy the garbage collection state.
*/
void
diff --git a/src/box/wal.c b/src/box/wal.c
index 09927f6f..a3be55f7 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -119,6 +119,12 @@ struct wal_writer
* with this LSN and LSN becomes "real".
*/
struct vclock vclock;
+ /**
+ * Signature of the oldest checkpoint available on the instance.
+ * The WAL writer must not delete WAL files that are needed to
+ * recover from it even if it is running out of disk space.
+ */
+ int64_t checkpoint_lsn;
/** The current WAL file. */
struct xlog current_wal;
/**
@@ -287,9 +293,9 @@ tx_schedule_rollback(struct cmsg *msg)
*/
static void
wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
- const char *wal_dirname, const struct tt_uuid *instance_uuid,
- struct vclock *vclock, int64_t wal_max_rows,
- int64_t wal_max_size)
+ const char *wal_dirname, int64_t wal_max_rows,
+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+ const struct vclock *vclock, int64_t checkpoint_lsn)
{
writer->wal_mode = wal_mode;
writer->wal_max_rows = wal_max_rows;
@@ -309,6 +315,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
vclock_create(&writer->vclock);
vclock_copy(&writer->vclock, vclock);
+ writer->checkpoint_lsn = checkpoint_lsn;
rlist_create(&writer->watchers);
}
@@ -412,16 +419,16 @@ wal_open(struct wal_writer *writer)
* mode are closed. WAL thread has been started.
*/
int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
- const struct tt_uuid *instance_uuid, struct vclock *vclock,
- int64_t wal_max_rows, int64_t wal_max_size)
+wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+ const struct vclock *vclock, int64_t first_checkpoint_lsn)
{
assert(wal_max_rows > 1);
struct wal_writer *writer = &wal_writer_singleton;
-
- wal_writer_create(writer, wal_mode, wal_dirname, instance_uuid,
- vclock, wal_max_rows, wal_max_size);
+ wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
+ wal_max_size, instance_uuid, vclock,
+ first_checkpoint_lsn);
/*
* Scan the WAL directory to build an index of all
@@ -539,25 +546,29 @@ wal_checkpoint(struct vclock *vclock, bool rotate)
struct wal_gc_msg
{
struct cbus_call_msg base;
- int64_t lsn;
+ int64_t wal_lsn;
+ int64_t checkpoint_lsn;
};
static int
wal_collect_garbage_f(struct cbus_call_msg *data)
{
- int64_t lsn = ((struct wal_gc_msg *)data)->lsn;
- xdir_collect_garbage(&wal_writer_singleton.wal_dir, lsn, 0);
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct wal_gc_msg *msg = (struct wal_gc_msg *)data;
+ writer->checkpoint_lsn = msg->checkpoint_lsn;
+ xdir_collect_garbage(&writer->wal_dir, msg->wal_lsn, 0);
return 0;
}
void
-wal_collect_garbage(int64_t lsn)
+wal_collect_garbage(int64_t wal_lsn, int64_t checkpoint_lsn)
{
struct wal_writer *writer = &wal_writer_singleton;
if (writer->wal_mode == WAL_NONE)
return;
struct wal_gc_msg msg;
- msg.lsn = lsn;
+ msg.wal_lsn = wal_lsn;
+ msg.checkpoint_lsn = checkpoint_lsn;
bool cancellable = fiber_set_cancellable(false);
cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg.base,
wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
@@ -620,11 +631,16 @@ wal_opt_rotate(struct wal_writer *writer)
/**
* Make sure there's enough disk space to append @len bytes
* of data to the current WAL.
+ *
+ * If fallocate() fails with ENOSPC, delete old WAL files
+ * that are not needed for recovery and retry.
*/
static int
wal_fallocate(struct wal_writer *writer, size_t len)
{
+ bool warn_no_space = true;
struct xlog *l = &writer->current_wal;
+ struct errinj *errinj = errinj(ERRINJ_WAL_FALLOCATE, ERRINJ_INT);
/*
* The actual write size can be greater than the sum size
@@ -633,11 +649,41 @@ wal_fallocate(struct wal_writer *writer, size_t len)
*/
len *= 2;
- if (l->allocated >= len)
- return 0;
- if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
- return 0;
+retry:
+ if (errinj == NULL || errinj->iparam == 0) {
+ if (l->allocated >= len)
+ return 0;
+ if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
+ return 0;
+ } else {
+ errinj->iparam--;
+ diag_set(ClientError, ER_INJECTION, "xlog fallocate");
+ errno = ENOSPC;
+ }
+ if (errno != ENOSPC)
+ goto error;
+ if (!xdir_has_garbage(&writer->wal_dir, writer->checkpoint_lsn))
+ goto error;
+
+ if (warn_no_space) {
+ say_crit("ran out of disk space, try to delete old WAL files");
+ warn_no_space = false;
+ }
+
+ /* Keep the original error. */
+ struct diag diag;
+ diag_create(&diag);
+ diag_move(diag_get(), &diag);
+ if (xdir_collect_garbage(&writer->wal_dir, writer->checkpoint_lsn,
+ XDIR_GC_REMOVE_ONE) != 0) {
+ diag_move(&diag, diag_get());
+ goto error;
+ }
+ diag_destroy(&diag);
+ wal_notify_watchers(writer, WAL_EVENT_GC);
+ goto retry;
+error:
diag_log();
return -1;
}
@@ -1027,13 +1073,16 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
{
assert(!rlist_empty(&watcher->next));
+ struct wal_watcher_msg *msg = &watcher->msg;
+ struct wal_writer *writer = &wal_writer_singleton;
+
events &= watcher->event_mask;
if (events == 0) {
/* The watcher isn't interested in this event. */
return;
}
- if (watcher->msg.cmsg.route != NULL) {
+ if (msg->cmsg.route != NULL) {
/*
* If the notification message is still en route,
* mark the watcher to resend it as soon as it
@@ -1043,9 +1092,12 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
return;
}
- watcher->msg.events = events;
- cmsg_init(&watcher->msg.cmsg, watcher->route);
- cpipe_push(&watcher->watcher_pipe, &watcher->msg.cmsg);
+ msg->events = events;
+ if (xdir_first_vclock(&writer->wal_dir, &msg->gc_vclock) < 0)
+ vclock_copy(&msg->gc_vclock, &writer->vclock);
+
+ cmsg_init(&msg->cmsg, watcher->route);
+ cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
}
static void
diff --git a/src/box/wal.h b/src/box/wal.h
index e8a4299c..d4a37c55 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -35,9 +35,9 @@
#include "small/rlist.h"
#include "cbus.h"
#include "journal.h"
+#include "vclock.h"
struct fiber;
-struct vclock;
struct wal_writer;
struct tt_uuid;
@@ -56,9 +56,9 @@ void
wal_thread_start();
int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
- const struct tt_uuid *instance_uuid, struct vclock *vclock,
- int64_t wal_max_rows, int64_t wal_max_size);
+wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+ const struct vclock *vclock, int64_t first_checkpoint_lsn);
void
wal_thread_stop();
@@ -73,6 +73,8 @@ struct wal_watcher_msg {
struct wal_watcher *watcher;
/** Bit mask of events, see wal_event. */
unsigned events;
+ /** VClock of the oldest stored WAL row. */
+ struct vclock gc_vclock;
};
enum wal_event {
@@ -80,6 +82,11 @@ enum wal_event {
WAL_EVENT_WRITE = (1 << 0),
/** A new WAL is created. */
WAL_EVENT_ROTATE = (1 << 1),
+ /**
+ * The WAL thread ran out of disk space and had to delete
+ * one or more old WAL files.
+ **/
+ WAL_EVENT_GC = (1 << 2),
};
struct wal_watcher {
@@ -168,11 +175,14 @@ int
wal_checkpoint(struct vclock *vclock, bool rotate);
/**
- * Remove WAL files that are not needed to recover
- * from snapshot with @lsn or newer.
+ * Remove all WAL files whose signature is less than @wal_lsn.
+ * Update the oldest checkpoint signature with @checkpoint_lsn.
+ * WAL thread will delete WAL files that are not needed to
+ * recover from the oldest checkpoint if it runs out of disk
+ * space.
*/
void
-wal_collect_garbage(int64_t lsn);
+wal_collect_garbage(int64_t wal_lsn, int64_t checkpoint_lsn);
void
wal_init_vy_log();
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 0f2f97b7..191fadcd 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -678,6 +678,9 @@ xdir_collect_garbage(struct xdir *dir, int64_t signature, unsigned flags)
say_info("removed %s", filename);
vclockset_remove(&dir->index, vclock);
free(vclock);
+
+ if (flags & XDIR_GC_REMOVE_ONE)
+ break;
}
return 0;
}
diff --git a/src/box/xlog.h b/src/box/xlog.h
index e2fdfd74..7c69cc4f 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -179,6 +179,20 @@ xdir_format_filename(struct xdir *dir, int64_t signature,
enum log_suffix suffix);
/**
+ * Return true if the given directory index has files whose
+ * signature is less than specified.
+ *
+ * Supposed to be used to check if xdir_collect_garbage() can
+ * actually delete some files.
+ */
+static inline bool
+xdir_has_garbage(struct xdir *dir, int64_t signature)
+{
+ struct vclock *vclock = vclockset_first(&dir->index);
+ return vclock != NULL && vclock_sum(vclock) < signature;
+}
+
+/**
* Flags passed to xdir_collect_garbage().
*/
enum {
@@ -187,6 +201,10 @@ enum {
* the caller thread.
*/
XDIR_GC_USE_COIO = 1 << 0,
+ /**
+ * Return after removing a file.
+ */
+ XDIR_GC_REMOVE_ONE = 1 << 1,
};
/**
@@ -203,6 +221,21 @@ void
xdir_collect_inprogress(struct xdir *xdir);
/**
+ * Return LSN and vclock (unless @vclock is NULL) of the oldest
+ * file in a directory or -1 if the directory is empty.
+ */
+static inline int64_t
+xdir_first_vclock(struct xdir *xdir, struct vclock *vclock)
+{
+ struct vclock *first = vclockset_first(&xdir->index);
+ if (first == NULL)
+ return -1;
+ if (vclock != NULL)
+ vclock_copy(vclock, first);
+ return vclock_sum(first);
+}
+
+/**
* Return LSN and vclock (unless @vclock is NULL) of the newest
* file in a directory or -1 if the directory is empty.
*/
diff --git a/src/errinj.h b/src/errinj.h
index 84a1fbb5..50062b62 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -79,6 +79,7 @@ struct errinj {
_(ERRINJ_WAL_WRITE_DISK, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_WAL_WRITE_EOF, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_WAL_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_WAL_FALLOCATE, ERRINJ_INT, {.iparam = 0}) \
_(ERRINJ_INDEX_ALLOC, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_TUPLE_ALLOC, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_TUPLE_FIELD, ERRINJ_BOOL, {.bparam = false}) \
diff --git a/test/box/errinj.result b/test/box/errinj.result
index c4a1326c..62dcc6a4 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -44,8 +44,8 @@ errinj.info()
state: false
ERRINJ_WAL_IO:
state: false
- ERRINJ_VY_INDEX_FILE_RENAME:
- state: false
+ ERRINJ_WAL_FALLOCATE:
+ state: 0
ERRINJ_TUPLE_ALLOC:
state: false
ERRINJ_VY_RUN_FILE_RENAME:
@@ -54,18 +54,20 @@ errinj.info()
state: false
ERRINJ_RELAY_REPORT_INTERVAL:
state: 0
+ ERRINJ_SNAP_COMMIT_DELAY:
+ state: false
ERRINJ_VY_READ_PAGE_TIMEOUT:
state: 0
ERRINJ_XLOG_META:
state: false
- ERRINJ_SNAP_COMMIT_DELAY:
- state: false
ERRINJ_WAL_BREAK_LSN:
state: -1
- ERRINJ_WAL_WRITE_DISK:
- state: false
ERRINJ_RELAY_BREAK_LSN:
state: -1
+ ERRINJ_WAL_WRITE_DISK:
+ state: false
+ ERRINJ_VY_INDEX_FILE_RENAME:
+ state: false
ERRINJ_VY_LOG_FILE_RENAME:
state: false
ERRINJ_VY_RUN_WRITE:
@@ -100,11 +102,11 @@ errinj.info()
state: false
ERRINJ_INDEX_ALLOC:
state: false
- ERRINJ_RELAY_TIMEOUT:
+ ERRINJ_VY_RUN_WRITE_TIMEOUT:
state: 0
ERRINJ_TESTING:
state: false
- ERRINJ_VY_RUN_WRITE_TIMEOUT:
+ ERRINJ_RELAY_TIMEOUT:
state: 0
ERRINJ_VY_SQUASH_TIMEOUT:
state: 0
diff --git a/test/replication/gc_no_space.result b/test/replication/gc_no_space.result
new file mode 100644
index 00000000..8e663cdf
--- /dev/null
+++ b/test/replication/gc_no_space.result
@@ -0,0 +1,234 @@
+--
+-- This test checks that when the WAL thread runs out of disk
+-- space it automatically deletes old WAL files and notifies
+-- the TX thread so that the latter can shoot off WAL consumers
+-- that need them. See gh-3397.
+--
+test_run = require('test_run').new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+fio = require('fio')
+---
+...
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function check_file_count(dir, glob, count)
+ local files = fio.glob(fio.pathjoin(dir, glob))
+ if #files == count then
+ return true
+ end
+ return false, files
+end;
+---
+...
+function check_wal_count(count)
+ return check_file_count(box.cfg.wal_dir, '*.xlog', count)
+end;
+---
+...
+function check_snap_count(count)
+ return check_file_count(box.cfg.memtx_dir, '*.snap', count)
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+default_checkpoint_count = box.cfg.checkpoint_count
+---
+...
+box.cfg{checkpoint_count = 2}
+---
+...
+test_run:cleanup_cluster()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = s:create_index('pk')
+---
+...
+box.snapshot()
+---
+- ok
+...
+--
+-- Create a few dead replicas to pin WAL files.
+--
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+s:auto_increment{}
+---
+- [1]
+...
+box.snapshot()
+---
+- ok
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+s:auto_increment{}
+---
+- [2]
+...
+box.snapshot()
+---
+- ok
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+test_run:cmd("delete server replica")
+---
+- true
+...
+--
+-- Make a few checkpoints and check that old WAL files are not
+-- deleted.
+--
+s:auto_increment{}
+---
+- [3]
+...
+box.snapshot()
+---
+- ok
+...
+s:auto_increment{}
+---
+- [4]
+...
+box.snapshot()
+---
+- ok
+...
+s:auto_increment{}
+---
+- [5]
+...
+check_wal_count(7)
+---
+- true
+...
+check_snap_count(2)
+---
+- true
+...
+#box.info.gc().consumers -- 3
+---
+- 3
+...
+--
+-- Inject a ENOSPC error and check that the WAL thread deletes
+-- old WAL files to prevent the user from seeing the error.
+--
+errinj.set('ERRINJ_WAL_FALLOCATE', 3)
+---
+- ok
+...
+s:auto_increment{} -- success
+---
+- [6]
+...
+errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
+---
+- 0
+...
+check_wal_count(3)
+---
+- true
+...
+check_snap_count(2)
+---
+- true
+...
+#box.info.gc().consumers -- 1
+---
+- 1
+...
+--
+-- Check that the WAL thread never deletes WAL files that are
+-- needed for recovery from a checkpoint.
+--
+errinj.set('ERRINJ_WAL_FALLOCATE', 2)
+---
+- ok
+...
+s:auto_increment{} -- failure
+---
+- error: Failed to write to disk
+...
+errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
+---
+- 0
+...
+check_wal_count(2)
+---
+- true
+...
+check_snap_count(2)
+---
+- true
+...
+#box.info.gc().consumers -- 0
+---
+- 0
+...
+s:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+test_run:cleanup_cluster()
+---
+...
+box.cfg{checkpoint_count = default_checkpoint_count}
+---
+...
diff --git a/test/replication/gc_no_space.test.lua b/test/replication/gc_no_space.test.lua
new file mode 100644
index 00000000..4bab2b0e
--- /dev/null
+++ b/test/replication/gc_no_space.test.lua
@@ -0,0 +1,103 @@
+--
+-- This test checks that when the WAL thread runs out of disk
+-- space it automatically deletes old WAL files and notifies
+-- the TX thread so that the latter can shoot off WAL consumers
+-- that need them. See gh-3397.
+--
+test_run = require('test_run').new()
+engine = test_run:get_cfg('engine')
+
+fio = require('fio')
+errinj = box.error.injection
+
+test_run:cmd("setopt delimiter ';'")
+function check_file_count(dir, glob, count)
+ local files = fio.glob(fio.pathjoin(dir, glob))
+ if #files == count then
+ return true
+ end
+ return false, files
+end;
+function check_wal_count(count)
+ return check_file_count(box.cfg.wal_dir, '*.xlog', count)
+end;
+function check_snap_count(count)
+ return check_file_count(box.cfg.memtx_dir, '*.snap', count)
+end;
+test_run:cmd("setopt delimiter ''");
+
+default_checkpoint_count = box.cfg.checkpoint_count
+box.cfg{checkpoint_count = 2}
+
+test_run:cleanup_cluster()
+box.schema.user.grant('guest', 'replication')
+s = box.schema.space.create('test', {engine = engine})
+_ = s:create_index('pk')
+box.snapshot()
+
+--
+-- Create a few dead replicas to pin WAL files.
+--
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+s:auto_increment{}
+box.snapshot()
+
+test_run:cmd("start server replica")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+s:auto_increment{}
+box.snapshot()
+
+test_run:cmd("start server replica")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+test_run:cmd("delete server replica")
+
+--
+-- Make a few checkpoints and check that old WAL files are not
+-- deleted.
+--
+s:auto_increment{}
+box.snapshot()
+s:auto_increment{}
+box.snapshot()
+s:auto_increment{}
+
+check_wal_count(7)
+check_snap_count(2)
+#box.info.gc().consumers -- 3
+
+--
+-- Inject a ENOSPC error and check that the WAL thread deletes
+-- old WAL files to prevent the user from seeing the error.
+--
+errinj.set('ERRINJ_WAL_FALLOCATE', 3)
+s:auto_increment{} -- success
+errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
+
+check_wal_count(3)
+check_snap_count(2)
+#box.info.gc().consumers -- 1
+
+--
+-- Check that the WAL thread never deletes WAL files that are
+-- needed for recovery from a checkpoint.
+--
+errinj.set('ERRINJ_WAL_FALLOCATE', 2)
+s:auto_increment{} -- failure
+errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
+
+check_wal_count(2)
+check_snap_count(2)
+#box.info.gc().consumers -- 0
+
+s:drop()
+box.schema.user.revoke('guest', 'replication')
+test_run:cleanup_cluster()
+
+box.cfg{checkpoint_count = default_checkpoint_count}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index f4abc7af..569c9048 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
script = master.lua
description = tarantool/box, replication
disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua
config = suite.cfg
lua_libs = lua/fast_replica.lua lua/rlimit.lua
long_run = prune.test.lua
--
2.11.0
^ permalink raw reply [flat|nested] 10+ messages in thread