* [PATCH v2 00/10] Allow to limit size of WAL files
@ 2018-12-08 15:48 Vladimir Davydov
2018-12-08 15:48 ` [PATCH v2 01/10] gc: do not use WAL watcher API for deactivating stale consumers Vladimir Davydov
` (10 more replies)
0 siblings, 11 replies; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Tarantool makes checkpoints every box.cfg.checkpoint_interval seconds
and keeps last box.cfg.checkpoint_count checkpoints. It also keeps all
intermediate WAL files. Currently, it isn't possible to set a checkpoint
trigger based on the sum size of WAL files, which makes it difficult to
estimate the minimal amount of disk space that needs to be allotted to a
Tarantool instance for storing WALs to eliminate the possibility of
ENOSPC errors. For example, under normal conditions a Tarantool instance
may write 1 GB of WAL files every box.cfg.checkpoint_interval seconds
and so one may assume that 1 GB times box.cfg.checkpoint_count should be
enough for the WAL partition, but there's no guarantee it won't write 10
GB between checkpoints when the load is extreme.
So we've agreed that we must provide users with one more configuration
option that could be used to impose the limit on the sum size of WAL
files. The new option is called box.cfg.checkpoint_wal_threshold. Once
the configured threshold is exceeded, the WAL thread notifies the
checkpoint daemon that it's time to make a new checkpoint and delete
old WAL files. Note, the new option only limits the size of WAL files
created since the last checkpoint, because backup WAL files are not
needed for recovery and can be deleted in case of emergency ENOSPC.
https://github.com/tarantool/tarantool/issues/1082
https://github.com/tarantool/tarantool/commits/dv/gh-1082-wal-checkpoint-threshold
v2 addresses Kostja's comments to v1:
https://www.freelists.org/post/tarantool-patches/PATCH-09-Allow-to-limit-size-of-WAL-files
The most important changes are:
- Don't piggyback on the WAL request for notifying TX about WAL events.
- Factor out checkpoint scheduling logic and cover it with a unit test.
- Move checkpoint daemon to the GC module.
- Don't treat checkpoint_wal_threshold=0 specially and change the
default value to 1 exabyte.
Vladimir Davydov (10):
gc: do not use WAL watcher API for deactivating stale consumers
wal: simplify watcher API
box: fix certain cfg options initialized twice on recovery
box: don't use box_checkpoint_is_in_progress outside box.cc
box: move checkpointing to gc module
gc: some renames
Introduce checkpoint schedule module
Rewrite checkpoint daemon in C
wal: pass struct instead of vclock to checkpoint methods
wal: trigger checkpoint if there are too many WALs
src/box/CMakeLists.txt | 2 +-
src/box/box.cc | 72 ++++------
src/box/box.h | 7 +-
src/box/checkpoint_schedule.c | 76 ++++++++++
src/box/checkpoint_schedule.h | 85 +++++++++++
src/box/gc.c | 246 +++++++++++++++++++++++++-------
src/box/gc.h | 72 +++++++---
src/box/lua/cfg.cc | 24 ++++
src/box/lua/checkpoint_daemon.lua | 136 ------------------
src/box/lua/init.c | 2 -
src/box/lua/load_cfg.lua | 13 +-
src/box/relay.cc | 12 +-
src/box/wal.c | 237 +++++++++++++++++++++++-------
src/box/wal.h | 85 ++++++-----
src/main.cc | 21 ++-
test/app-tap/init_script.result | 87 +++++------
test/box/admin.result | 2 +
test/box/cfg.result | 4 +
test/unit/CMakeLists.txt | 6 +
test/unit/checkpoint_schedule.c | 96 +++++++++++++
test/unit/checkpoint_schedule.result | 41 ++++++
test/xlog/checkpoint_daemon.result | 143 ++-----------------
test/xlog/checkpoint_daemon.test.lua | 61 +-------
test/xlog/checkpoint_threshold.result | 115 +++++++++++++++
test/xlog/checkpoint_threshold.test.lua | 63 ++++++++
test/xlog/suite.ini | 2 +-
26 files changed, 1119 insertions(+), 591 deletions(-)
create mode 100644 src/box/checkpoint_schedule.c
create mode 100644 src/box/checkpoint_schedule.h
delete mode 100644 src/box/lua/checkpoint_daemon.lua
create mode 100644 test/unit/checkpoint_schedule.c
create mode 100644 test/unit/checkpoint_schedule.result
create mode 100644 test/xlog/checkpoint_threshold.result
create mode 100644 test/xlog/checkpoint_threshold.test.lua
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 01/10] gc: do not use WAL watcher API for deactivating stale consumers
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:41 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 02/10] wal: simplify watcher API Vladimir Davydov
` (9 subsequent siblings)
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
The WAL thread may delete old WAL files if it gets ENOSPC error.
Currently, we use WAL watcher API to notify the TX thread about it so
that it can shoot off stale replicas. This looks ugly, because WAL
watcher API was initially designed to propagate WAL changes to relay
threads and the new event WAL_EVENT_GC, which was introduced for
notifying about ENOSPC-driven garbage collection, isn't used anywhere
else. Besides, there's already a pipe from WAL to TX - we could reuse it
instead of opening another one.
If we followed down that path, then in order to trigger a checkpoint
from the WAL thread (see #1082), we would have to introduce yet another
esoteric WAL watcher event, making the whole design look even uglier.
That said, let's rewrite the garbage collection notification procedure
using a plane callback instead of abusing WAL watcher API.
---
src/box/box.cc | 9 +++++--
src/box/gc.c | 33 ++++---------------------
src/box/gc.h | 19 ++++++---------
src/box/wal.c | 76 ++++++++++++++++++++++++++++++++++++++++++++++++----------
src/box/wal.h | 19 ++++++++-------
5 files changed, 92 insertions(+), 64 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index bb7c1bb9..20412af4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2011,6 +2011,12 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
cbus_process(endpoint);
}
+static void
+on_wal_garbage_collection(const struct vclock *vclock)
+{
+ gc_advance(vclock);
+}
+
void
box_init(void)
{
@@ -2125,10 +2131,9 @@ box_cfg_xc(void)
enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
- &checkpoint->vclock) != 0) {
+ &checkpoint->vclock, on_wal_garbage_collection) != 0) {
diag_raise();
}
- gc_set_wal_watcher();
rmean_cleanup(rmean_box);
diff --git a/src/box/gc.c b/src/box/gc.c
index 9c049977..87273b8d 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -113,26 +113,6 @@ gc_init(void)
fiber_start(gc.fiber);
}
-static void
-gc_process_wal_event(struct wal_watcher_msg *);
-
-void
-gc_set_wal_watcher(void)
-{
- /*
- * Since the function is called from box_cfg() it is
- * important that we do not pass a message processing
- * callback to wal_set_watcher(). Doing so would cause
- * credentials corruption in the fiber executing
- * box_cfg() in case it processes some iproto messages.
- * Besides, by the time the function is called
- * tx_fiber_pool is already set up and it will process
- * all the messages directed to "tx" endpoint safely.
- */
- wal_set_watcher(&gc.wal_watcher, "tx", gc_process_wal_event,
- NULL, WAL_EVENT_GC);
-}
-
void
gc_free(void)
{
@@ -270,25 +250,20 @@ gc_wait(void)
fiber_cond_wait(&gc.cond);
}
-/**
- * Deactivate consumers that need files deleted by the WAL thread.
- */
-static void
-gc_process_wal_event(struct wal_watcher_msg *msg)
+void
+gc_advance(const struct vclock *vclock)
{
- assert((msg->events & WAL_EVENT_GC) != 0);
-
/*
* In case of emergency ENOSPC, the WAL thread may delete
* WAL files needed to restore from backup checkpoints,
* which would be kept by the garbage collector otherwise.
* Bring the garbage collector vclock up to date.
*/
- vclock_copy(&gc.vclock, &msg->gc_vclock);
+ vclock_copy(&gc.vclock, vclock);
struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
while (consumer != NULL &&
- vclock_sum(&consumer->vclock) < vclock_sum(&msg->gc_vclock)) {
+ vclock_sum(&consumer->vclock) < vclock_sum(vclock)) {
struct gc_consumer *next = gc_tree_next(&gc.consumers,
consumer);
assert(!consumer->is_inactive);
diff --git a/src/box/gc.h b/src/box/gc.h
index 6e96d7bb..a141ace6 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -36,7 +36,6 @@
#include "fiber_cond.h"
#include "vclock.h"
-#include "wal.h"
#include "trivia/util.h"
#if defined(__cplusplus)
@@ -122,11 +121,6 @@ struct gc_state {
struct rlist checkpoints;
/** Registered consumers, linked by gc_consumer::node. */
gc_tree_t consumers;
- /**
- * WAL event watcher. Needed to shoot off stale consumers
- * when a WAL file is deleted due to ENOSPC.
- */
- struct wal_watcher wal_watcher;
/** Fiber that removes old files in the background. */
struct fiber *fiber;
/**
@@ -192,12 +186,6 @@ void
gc_init(void);
/**
- * Set WAL watcher. Called after WAL is initialized.
- */
-void
-gc_set_wal_watcher(void);
-
-/**
* Destroy the garbage collection state.
*/
void
@@ -211,6 +199,13 @@ void
gc_wait(void);
/**
+ * Advance the garbage collector vclock to the given position.
+ * Deactivate WAL consumers that need older data.
+ */
+void
+gc_advance(const struct vclock *vclock);
+
+/**
* Update the minimal number of checkpoints to preserve.
* Called when box.cfg.checkpoint_count is updated.
*
diff --git a/src/box/wal.c b/src/box/wal.c
index 3b5b9492..0775dbae 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -91,6 +91,7 @@ struct wal_writer
{
struct journal base;
/* ----------------- tx ------------------- */
+ wal_on_garbage_collection_f on_garbage_collection;
/**
* The rollback queue. An accumulator for all requests
* that need to be rolled back. Also acts as a valve
@@ -254,6 +255,7 @@ tx_schedule_queue(struct stailq *queue)
static void
tx_schedule_commit(struct cmsg *msg)
{
+ struct wal_writer *writer = &wal_writer_singleton;
struct wal_msg *batch = (struct wal_msg *) msg;
/*
* Move the rollback list to the writer first, since
@@ -261,7 +263,6 @@ tx_schedule_commit(struct cmsg *msg)
* iteration of tx_schedule_queue loop.
*/
if (! stailq_empty(&batch->rollback)) {
- struct wal_writer *writer = &wal_writer_singleton;
/* Closes the input valve. */
stailq_concat(&writer->rollback, &batch->rollback);
}
@@ -286,6 +287,28 @@ tx_schedule_rollback(struct cmsg *msg)
stailq_create(&writer->rollback);
}
+
+/**
+ * This message is sent from WAL to TX when the WAL thread hits
+ * ENOSPC and has to delete some backup WAL files to continue.
+ * The TX thread uses this message to shoot off WAL consumers
+ * that needed deleted WAL files.
+ */
+struct tx_notify_gc_msg {
+ struct cmsg base;
+ /** VClock of the oldest WAL row preserved by WAL. */
+ struct vclock vclock;
+};
+
+static void
+tx_notify_gc(struct cmsg *msg)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct vclock *vclock = &((struct tx_notify_gc_msg *)msg)->vclock;
+ writer->on_garbage_collection(vclock);
+ free(msg);
+}
+
/**
* Initialize WAL writer context. Even though it's a singleton,
* encapsulate the details just in case we may use
@@ -296,7 +319,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
const char *wal_dirname, int64_t wal_max_rows,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
const struct vclock *vclock,
- const struct vclock *checkpoint_vclock)
+ const struct vclock *checkpoint_vclock,
+ wal_on_garbage_collection_f on_garbage_collection)
{
writer->wal_mode = wal_mode;
writer->wal_max_rows = wal_max_rows;
@@ -315,6 +339,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
vclock_copy(&writer->vclock, vclock);
vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
rlist_create(&writer->watchers);
+
+ writer->on_garbage_collection = on_garbage_collection;
}
/** Destroy a WAL writer structure. */
@@ -419,14 +445,15 @@ wal_open(struct wal_writer *writer)
int
wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
- const struct vclock *vclock, const struct vclock *checkpoint_vclock)
+ const struct vclock *vclock, const struct vclock *checkpoint_vclock,
+ wal_on_garbage_collection_f on_garbage_collection)
{
assert(wal_max_rows > 1);
struct wal_writer *writer = &wal_writer_singleton;
wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
wal_max_size, instance_uuid, vclock,
- checkpoint_vclock);
+ checkpoint_vclock, on_garbage_collection);
/*
* Scan the WAL directory to build an index of all
@@ -673,9 +700,10 @@ wal_opt_rotate(struct wal_writer *writer)
static int
wal_fallocate(struct wal_writer *writer, size_t len)
{
- bool warn_no_space = true;
+ bool warn_no_space = true, notify_gc = false;
struct xlog *l = &writer->current_wal;
struct errinj *errinj = errinj(ERRINJ_WAL_FALLOCATE, ERRINJ_INT);
+ int rc = 0;
/*
* Max LSN that can be collected in case of ENOSPC -
@@ -693,9 +721,9 @@ wal_fallocate(struct wal_writer *writer, size_t len)
retry:
if (errinj == NULL || errinj->iparam == 0) {
if (l->allocated >= len)
- return 0;
+ goto out;
if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
- return 0;
+ goto out;
} else {
errinj->iparam--;
diag_set(ClientError, ER_INJECTION, "xlog fallocate");
@@ -722,11 +750,37 @@ retry:
}
diag_destroy(&diag);
- wal_notify_watchers(writer, WAL_EVENT_GC);
+ notify_gc = true;
goto retry;
error:
diag_log();
- return -1;
+ rc = -1;
+out:
+ /*
+ * Notify the TX thread if the WAL thread had to delete
+ * some WAL files to proceed so that TX can shoot off WAL
+ * consumers that still need those files.
+ *
+ * We allocate the message with malloc() and we ignore
+ * allocation failures, because this is a pretty rare
+ * event and a failure to send this message isn't really
+ * critical.
+ */
+ if (notify_gc) {
+ static struct cmsg_hop route[] = {
+ { tx_notify_gc, NULL },
+ };
+ struct tx_notify_gc_msg *msg = malloc(sizeof(*msg));
+ if (msg != NULL) {
+ if (xdir_first_vclock(&writer->wal_dir,
+ &msg->vclock) < 0)
+ vclock_copy(&msg->vclock, &writer->vclock);
+ cmsg_init(&msg->base, route);
+ cpipe_push(&wal_thread.tx_prio_pipe, &msg->base);
+ } else
+ say_warn("failed to allocate gc notification message");
+ }
+ return rc;
}
static void
@@ -1115,7 +1169,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
assert(!rlist_empty(&watcher->next));
struct wal_watcher_msg *msg = &watcher->msg;
- struct wal_writer *writer = &wal_writer_singleton;
events &= watcher->event_mask;
if (events == 0) {
@@ -1134,9 +1187,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
}
msg->events = events;
- if (xdir_first_vclock(&writer->wal_dir, &msg->gc_vclock) < 0)
- vclock_copy(&msg->gc_vclock, &writer->vclock);
-
cmsg_init(&msg->cmsg, watcher->route);
cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
}
diff --git a/src/box/wal.h b/src/box/wal.h
index 3c9eb42f..6e5a5458 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -52,13 +52,23 @@ extern int wal_dir_lock;
extern "C" {
#endif /* defined(__cplusplus) */
+/**
+ * Callback invoked in the TX thread when the WAL thread runs out
+ * of disk space and has to delete some old WAL files to continue.
+ * It is supposed to shoot off WAL consumers that need the deleted
+ * files. The vclock of the oldest WAL row still available on the
+ * instance is passed in @vclock.
+ */
+typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
+
void
wal_thread_start();
int
wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
- const struct vclock *vclock, const struct vclock *checkpoint_vclock);
+ const struct vclock *vclock, const struct vclock *checkpoint_vclock,
+ wal_on_garbage_collection_f on_garbage_collection);
void
wal_thread_stop();
@@ -73,8 +83,6 @@ struct wal_watcher_msg {
struct wal_watcher *watcher;
/** Bit mask of events, see wal_event. */
unsigned events;
- /** VClock of the oldest stored WAL row. */
- struct vclock gc_vclock;
};
enum wal_event {
@@ -82,11 +90,6 @@ enum wal_event {
WAL_EVENT_WRITE = (1 << 0),
/** A new WAL is created. */
WAL_EVENT_ROTATE = (1 << 1),
- /**
- * The WAL thread ran out of disk space and had to delete
- * one or more old WAL files.
- **/
- WAL_EVENT_GC = (1 << 2),
};
struct wal_watcher {
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 02/10] wal: simplify watcher API
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
2018-12-08 15:48 ` [PATCH v2 01/10] gc: do not use WAL watcher API for deactivating stale consumers Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:41 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 03/10] box: fix certain cfg options initialized twice on recovery Vladimir Davydov
` (8 subsequent siblings)
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
This patch reverts changes done in order to make WAL watcher API
suitable for notiying TX about WAL garbage collection triggered on
ENOSPC, namely:
b073b0176704 wal: add event_mask to wal_watcher
7077341ec5b3 wal: pass wal_watcher_msg to wal_watcher callback
We don't need them anymore, because now we piggyback the notification
on the WAL request message that triggered ENOSPC.
---
src/box/relay.cc | 12 ++++--------
src/box/wal.c | 27 ++++++---------------------
src/box/wal.h | 25 ++++++-------------------
3 files changed, 16 insertions(+), 48 deletions(-)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index f864d308..a01c2a2e 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -406,12 +406,9 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
}
static void
-relay_process_wal_event(struct wal_watcher_msg *msg)
+relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
{
- assert((msg->events & (WAL_EVENT_WRITE | WAL_EVENT_ROTATE)) != 0);
-
- struct relay *relay = container_of(msg->watcher, struct relay,
- wal_watcher);
+ struct relay *relay = container_of(watcher, struct relay, wal_watcher);
if (relay->state != RELAY_FOLLOW) {
/*
* Do not try to send anything to the replica
@@ -421,7 +418,7 @@ relay_process_wal_event(struct wal_watcher_msg *msg)
}
try {
recover_remaining_wals(relay->r, &relay->stream, NULL,
- (msg->events & WAL_EVENT_ROTATE) != 0);
+ (events & WAL_EVENT_ROTATE) != 0);
} catch (Exception *e) {
e->log();
diag_move(diag_get(), &relay->diag);
@@ -507,8 +504,7 @@ relay_subscribe_f(va_list ap)
};
trigger_add(&r->on_close_log, &on_close_log);
wal_set_watcher(&relay->wal_watcher, cord_name(cord()),
- relay_process_wal_event, cbus_process,
- WAL_EVENT_WRITE | WAL_EVENT_ROTATE);
+ relay_process_wal_event, cbus_process);
relay_set_cord_name(relay->io.fd);
diff --git a/src/box/wal.c b/src/box/wal.c
index 0775dbae..644f58c8 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -1169,13 +1169,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
assert(!rlist_empty(&watcher->next));
struct wal_watcher_msg *msg = &watcher->msg;
-
- events &= watcher->event_mask;
- if (events == 0) {
- /* The watcher isn't interested in this event. */
- return;
- }
-
if (msg->cmsg.route != NULL) {
/*
* If the notification message is still en route,
@@ -1195,7 +1188,10 @@ static void
wal_watcher_notify_perform(struct cmsg *cmsg)
{
struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg;
- msg->watcher->cb(msg);
+ struct wal_watcher *watcher = msg->watcher;
+ unsigned events = msg->events;
+
+ watcher->cb(watcher, events);
}
static void
@@ -1248,9 +1244,8 @@ wal_watcher_detach(void *arg)
void
wal_set_watcher(struct wal_watcher *watcher, const char *name,
- void (*watcher_cb)(struct wal_watcher_msg *),
- void (*process_cb)(struct cbus_endpoint *),
- unsigned event_mask)
+ void (*watcher_cb)(struct wal_watcher *, unsigned events),
+ void (*process_cb)(struct cbus_endpoint *))
{
assert(journal_is_initialized(&wal_writer_singleton.base));
@@ -1260,7 +1255,6 @@ wal_set_watcher(struct wal_watcher *watcher, const char *name,
watcher->msg.events = 0;
watcher->msg.cmsg.route = NULL;
watcher->pending_events = 0;
- watcher->event_mask = event_mask;
assert(lengthof(watcher->route) == 2);
watcher->route[0] = (struct cmsg_hop)
@@ -1281,15 +1275,6 @@ wal_clear_watcher(struct wal_watcher *watcher,
wal_watcher_detach, watcher, process_cb);
}
-/**
- * Notify all interested watchers about a WAL event.
- *
- * XXX: Note, this function iterates over all registered watchers,
- * including those that are not interested in the given event.
- * This is OK only as long as the number of events/watchers is
- * small. If this ever changes, we should consider maintaining
- * a separate watcher list per each event type.
- */
static void
wal_notify_watchers(struct wal_writer *writer, unsigned events)
{
diff --git a/src/box/wal.h b/src/box/wal.h
index 6e5a5458..3d616353 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -73,15 +73,9 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
void
wal_thread_stop();
-/**
- * A notification message sent from the WAL to a watcher
- * when a WAL event occurs.
- */
struct wal_watcher_msg {
struct cmsg cmsg;
- /** Pointer to the watcher this message is for. */
struct wal_watcher *watcher;
- /** Bit mask of events, see wal_event. */
unsigned events;
};
@@ -96,7 +90,7 @@ struct wal_watcher {
/** Link in wal_writer::watchers. */
struct rlist next;
/** The watcher callback function. */
- void (*cb)(struct wal_watcher_msg *);
+ void (*cb)(struct wal_watcher *, unsigned events);
/** Pipe from the watcher to WAL. */
struct cpipe wal_pipe;
/** Pipe from WAL to the watcher. */
@@ -106,11 +100,6 @@ struct wal_watcher {
/** Message sent to notify the watcher. */
struct wal_watcher_msg msg;
/**
- * Bit mask of WAL events that this watcher is
- * interested in.
- */
- unsigned event_mask;
- /**
* Bit mask of WAL events that happened while
* the notification message was en route.
* It indicates that the message must be resend
@@ -135,19 +124,17 @@ struct wal_watcher {
* @param watcher WAL watcher to register.
* @param name Name of the cbus endpoint at the caller's cord.
* @param watcher_cb Callback to invoke from the caller's cord
- * upon receiving a WAL event. It takes an object
- * of type wal_watcher_msg that stores a pointer
- * to the watcher and information about the event.
+ * upon receiving a WAL event. Apart from the
+ * watcher itself, it takes a bit mask of events.
+ * Events are described in wal_event enum.
* @param process_cb Function called to process cbus messages
* while the watcher is being attached or NULL
* if the cbus loop is running elsewhere.
- * @param event_mask Bit mask of events the watcher is interested in.
*/
void
wal_set_watcher(struct wal_watcher *watcher, const char *name,
- void (*watcher_cb)(struct wal_watcher_msg *),
- void (*process_cb)(struct cbus_endpoint *),
- unsigned event_mask);
+ void (*watcher_cb)(struct wal_watcher *, unsigned events),
+ void (*process_cb)(struct cbus_endpoint *));
/**
* Unsubscribe from WAL events.
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 03/10] box: fix certain cfg options initialized twice on recovery
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
2018-12-08 15:48 ` [PATCH v2 01/10] gc: do not use WAL watcher API for deactivating stale consumers Vladimir Davydov
2018-12-08 15:48 ` [PATCH v2 02/10] wal: simplify watcher API Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:42 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 04/10] box: don't use box_checkpoint_is_in_progress outside box.cc Vladimir Davydov
` (7 subsequent siblings)
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Certain dynamic configuration options are initialized right in box.cc,
because they are needed for recovery. All such options are supposed to
be present in dynamic_cfg_skip_at_load table so that load_cfg.lua won't
try to set them again upon recovery completion. However, not all of them
happen to be there - sometime we simply forgot to patch this table after
introduction of a new configuration option. This patch adds all the
missing ones except checkpoint_count - there's no point to initialize
checkpoint_count in box.cc so it removes it from box.cc instead.
---
src/box/box.cc | 1 -
src/box/lua/load_cfg.lua | 8 +++++++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 20412af4..036769a9 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2073,7 +2073,6 @@ box_cfg_xc(void)
box_check_replicaset_uuid(&replicaset_uuid);
box_set_net_msg_max();
- box_set_checkpoint_count();
box_set_too_long_threshold();
box_set_replication_timeout();
box_set_replication_connect_timeout();
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index fd99206f..38e742c8 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -250,21 +250,27 @@ local dynamic_cfg = {
}
local dynamic_cfg_skip_at_load = {
- wal_mode = true,
listen = true,
memtx_memory = true,
+ memtx_max_tuple_size = true,
vinyl_memory = true,
+ vinyl_max_tuple_size = true,
+ vinyl_cache = true,
+ vinyl_timeout = true,
+ too_long_threshold = true,
replication = true,
replication_timeout = true,
replication_connect_timeout = true,
replication_connect_quorum = true,
replication_sync_lag = true,
replication_sync_timeout = true,
+ replication_skip_conflict = true,
wal_dir_rescan_delay = true,
custom_proc_title = true,
force_recovery = true,
instance_uuid = true,
replicaset_uuid = true,
+ net_msg_max = true,
}
local function convert_gb(size)
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 04/10] box: don't use box_checkpoint_is_in_progress outside box.cc
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
` (2 preceding siblings ...)
2018-12-08 15:48 ` [PATCH v2 03/10] box: fix certain cfg options initialized twice on recovery Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:43 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 05/10] box: move checkpointing to gc module Vladimir Davydov
` (6 subsequent siblings)
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
We only use box_checkpoint_is_in_progress in SIGUSR1 signal handler to
print a warning in case checkpointing cannot be started, because it's
already done by another fiber. Actually, it's not necessary to use it
there - instead we can simply log the error returned by box_checkpoint,
which will be self-explaining ER_CHECKPOINT_IN_PROGRESS in this case.
So let's make box_checkpoint_is_in_progress private to box.cc - this
will simplify moving the checkpoint daemon to the gc module.
While we are at it, remove the unused snapshot_version declaration.
---
src/box/box.cc | 12 ++++++++----
src/box/box.h | 5 -----
src/main.cc | 21 +++++++++++++++------
3 files changed, 23 insertions(+), 15 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 036769a9..d146ec16 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -87,10 +87,14 @@ static void title(const char *new_status)
systemd_snotify("STATUS=%s", status);
}
-bool box_checkpoint_is_in_progress = false;
const struct vclock *box_vclock = &replicaset.vclock;
/**
+ * Set if there's a fiber performing box_checkpoint() right now.
+ */
+static bool checkpoint_is_in_progress;
+
+/**
* Set if backup is in progress, i.e. box_backup_start() was
* called but box_backup_stop() hasn't been yet.
*/
@@ -2182,11 +2186,11 @@ box_checkpoint()
if (! is_box_configured)
return 0;
int rc = 0;
- if (box_checkpoint_is_in_progress) {
+ if (checkpoint_is_in_progress) {
diag_set(ClientError, ER_CHECKPOINT_IN_PROGRESS);
return -1;
}
- box_checkpoint_is_in_progress = true;
+ checkpoint_is_in_progress = true;
/* create checkpoint files */
latch_lock(&schema_lock);
if ((rc = engine_begin_checkpoint()))
@@ -2206,7 +2210,7 @@ end:
engine_abort_checkpoint();
latch_unlock(&schema_lock);
- box_checkpoint_is_in_progress = false;
+ checkpoint_is_in_progress = false;
/*
* Wait for background garbage collection that might
diff --git a/src/box/box.h b/src/box/box.h
index 7712c192..6de0691d 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -124,11 +124,6 @@ box_wait_ro(bool ro, double timeout);
void
box_set_orphan(bool orphan);
-/** True if snapshot is in progress. */
-extern bool box_checkpoint_is_in_progress;
-/** Incremented with each next snapshot. */
-extern uint32_t snapshot_version;
-
/**
* Iterate over all spaces and save them to the
* snapshot file.
diff --git a/src/main.cc b/src/main.cc
index 6fb0e479..993355ac 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -96,18 +96,27 @@ tarantool_uptime(void)
}
/**
-* Create a checkpoint from signal handler (SIGUSR1)
-*/
+ * Create a checkpoint from signal handler (SIGUSR1)
+ */
+static int
+sig_checkpoint_f(va_list ap)
+{
+ (void)ap;
+ if (box_checkpoint() != 0)
+ diag_log();
+ return 0;
+}
+
static void
sig_checkpoint(ev_loop * /* loop */, struct ev_signal * /* w */,
int /* revents */)
{
- if (box_checkpoint_is_in_progress) {
- say_warn("Checkpoint is already in progress,"
- " the signal is ignored");
+ struct fiber *f = fiber_new("checkpoint", sig_checkpoint_f);
+ if (f == NULL) {
+ say_warn("failed to allocate checkpoint fiber");
return;
}
- fiber_start(fiber_new_xc("checkpoint", (fiber_func)box_checkpoint));
+ fiber_wakeup(f);
}
static void
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 05/10] box: move checkpointing to gc module
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
` (3 preceding siblings ...)
2018-12-08 15:48 ` [PATCH v2 04/10] box: don't use box_checkpoint_is_in_progress outside box.cc Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:44 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 06/10] gc: some renames Vladimir Davydov
` (5 subsequent siblings)
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Garbage collection module seems to be the best way to accommodate the
checkpoint daemon, but to move it there, we first need to move the code
performing checkpoints there to avoid cross-dependency between box.cc
and gc.c.
---
src/box/box.cc | 44 +------------------------------------
src/box/gc.c | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
src/box/gc.h | 28 ++++++++++++++++--------
3 files changed, 87 insertions(+), 53 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index d146ec16..121ad787 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -90,11 +90,6 @@ static void title(const char *new_status)
const struct vclock *box_vclock = &replicaset.vclock;
/**
- * Set if there's a fiber performing box_checkpoint() right now.
- */
-static bool checkpoint_is_in_progress;
-
-/**
* Set if backup is in progress, i.e. box_backup_start() was
* called but box_backup_stop() hasn't been yet.
*/
@@ -2185,45 +2180,8 @@ box_checkpoint()
/* Signal arrived before box.cfg{} */
if (! is_box_configured)
return 0;
- int rc = 0;
- if (checkpoint_is_in_progress) {
- diag_set(ClientError, ER_CHECKPOINT_IN_PROGRESS);
- return -1;
- }
- checkpoint_is_in_progress = true;
- /* create checkpoint files */
- latch_lock(&schema_lock);
- if ((rc = engine_begin_checkpoint()))
- goto end;
-
- struct vclock vclock;
- if ((rc = wal_begin_checkpoint(&vclock)))
- goto end;
- if ((rc = engine_commit_checkpoint(&vclock)))
- goto end;
-
- wal_commit_checkpoint(&vclock);
- gc_add_checkpoint(&vclock);
-end:
- if (rc)
- engine_abort_checkpoint();
-
- latch_unlock(&schema_lock);
- checkpoint_is_in_progress = false;
-
- /*
- * Wait for background garbage collection that might
- * have been triggered by this checkpoint to complete.
- * Strictly speaking, it isn't necessary, but it
- * simplifies testing as it guarantees that by the
- * time box.snapshot() returns, all outdated checkpoint
- * files have been removed.
- */
- if (!rc)
- gc_wait();
-
- return rc;
+ return gc_checkpoint();
}
int
diff --git a/src/box/gc.c b/src/box/gc.c
index 87273b8d..153ef65c 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -45,11 +45,14 @@
#include <small/rlist.h>
#include "diag.h"
+#include "errcode.h"
#include "fiber.h"
#include "fiber_cond.h"
+#include "latch.h"
#include "say.h"
#include "vclock.h"
#include "cbus.h"
+#include "schema.h"
#include "engine.h" /* engine_collect_garbage() */
#include "wal.h" /* wal_collect_garbage() */
@@ -242,7 +245,11 @@ gc_schedule(void)
fiber_wakeup(gc.fiber);
}
-void
+/**
+ * Wait for background garbage collection scheduled prior
+ * to this point to complete.
+ */
+static void
gc_wait(void)
{
unsigned scheduled = gc.scheduled;
@@ -320,6 +327,65 @@ gc_add_checkpoint(const struct vclock *vclock)
gc_schedule();
}
+int
+gc_checkpoint(void)
+{
+ int rc;
+ struct vclock vclock;
+
+ if (gc.checkpoint_is_in_progress) {
+ diag_set(ClientError, ER_CHECKPOINT_IN_PROGRESS);
+ return -1;
+ }
+ gc.checkpoint_is_in_progress = true;
+
+ /*
+ * We don't support DDL operations while making a checkpoint.
+ * Lock them out.
+ */
+ latch_lock(&schema_lock);
+
+ /*
+ * Rotate WAL and call engine callbacks to create a checkpoint
+ * on disk for each registered engine.
+ */
+ rc = engine_begin_checkpoint();
+ if (rc != 0)
+ goto out;
+ rc = wal_begin_checkpoint(&vclock);
+ if (rc != 0)
+ goto out;
+ rc = engine_commit_checkpoint(&vclock);
+ if (rc != 0)
+ goto out;
+ wal_commit_checkpoint(&vclock);
+
+ /*
+ * Finally, track the newly created checkpoint in the garbage
+ * collector state.
+ */
+ gc_add_checkpoint(&vclock);
+out:
+ if (rc != 0)
+ engine_abort_checkpoint();
+
+ latch_unlock(&schema_lock);
+ gc.checkpoint_is_in_progress = false;
+
+ /*
+ * Wait for background garbage collection that might
+ * have been triggered by this checkpoint to complete.
+ * Strictly speaking, it isn't necessary, but it
+ * simplifies testing as it guarantees that by the
+ * time box.snapshot() returns, all outdated checkpoint
+ * files have been removed.
+ */
+ if (rc == 0)
+ gc_wait();
+
+ return rc;
+}
+
void
gc_ref_checkpoint(struct gc_checkpoint *checkpoint,
struct gc_checkpoint_ref *ref, const char *format, ...)
diff --git a/src/box/gc.h b/src/box/gc.h
index a141ace6..84a441f2 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -31,6 +31,7 @@
* SUCH DAMAGE.
*/
+#include <stdbool.h>
#include <stddef.h>
#include <small/rlist.h>
@@ -141,6 +142,10 @@ struct gc_state {
* taken at that moment of time.
*/
unsigned completed, scheduled;
+ /**
+ * Set if there's a fiber making a checkpoint right now.
+ */
+ bool checkpoint_is_in_progress;
};
extern struct gc_state gc;
@@ -192,13 +197,6 @@ void
gc_free(void);
/**
- * Wait for background garbage collection scheduled prior
- * to this point to complete.
- */
-void
-gc_wait(void);
-
-/**
* Advance the garbage collector vclock to the given position.
* Deactivate WAL consumers that need older data.
*/
@@ -217,14 +215,26 @@ void
gc_set_min_checkpoint_count(int min_checkpoint_count);
/**
- * Track a new checkpoint in the garbage collector state.
- * Note, this function may run garbage collector to remove
+ * Track an existing checkpoint in the garbage collector state.
+ * Note, this function may trigger garbage collection to remove
* old checkpoints.
*/
void
gc_add_checkpoint(const struct vclock *vclock);
/**
+ * Make a checkpoint.
+ *
+ * This function runs engine/WAL callbacks to create a checkpoint
+ * on disk, then tracks the new checkpoint in the garbage collector
+ * state (see gc_add_checkpoint()).
+ *
+ * Returns 0 on success. On failure returns -1 and sets diag.
+ */
+int
+gc_checkpoint(void);
+
+/**
* Get a reference to @checkpoint and store it in @ref.
* This will block the garbage collector from deleting
* the checkpoint files until the reference is released
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 06/10] gc: some renames
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
` (4 preceding siblings ...)
2018-12-08 15:48 ` [PATCH v2 05/10] box: move checkpointing to gc module Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:44 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 07/10] Introduce checkpoint schedule module Vladimir Davydov
` (4 subsequent siblings)
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
GC module is responsible not only for garbage collection, but also for
tracking consumers and making checkpoints. Soon it will also incorporate
the checkpoint daemon. Let's prefix all members related to the cleanup
procedure accordingly to avoid confusion.
---
src/box/gc.c | 50 +++++++++++++++++++++++++-------------------------
src/box/gc.h | 8 ++++----
2 files changed, 29 insertions(+), 29 deletions(-)
diff --git a/src/box/gc.c b/src/box/gc.c
index 153ef65c..e1b23eed 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -59,7 +59,7 @@
struct gc_state gc;
static int
-gc_fiber_f(va_list);
+gc_cleanup_fiber_f(va_list);
/**
* Comparator used for ordering gc_consumer objects by signature
@@ -107,13 +107,13 @@ gc_init(void)
vclock_create(&gc.vclock);
rlist_create(&gc.checkpoints);
gc_tree_new(&gc.consumers);
- fiber_cond_create(&gc.cond);
+ fiber_cond_create(&gc.cleanup_cond);
- gc.fiber = fiber_new("gc", gc_fiber_f);
- if (gc.fiber == NULL)
+ gc.cleanup_fiber = fiber_new("gc", gc_cleanup_fiber_f);
+ if (gc.cleanup_fiber == NULL)
panic("failed to start garbage collection fiber");
- fiber_start(gc.fiber);
+ fiber_start(gc.cleanup_fiber);
}
void
@@ -147,7 +147,7 @@ gc_free(void)
* this function is specified by box.cfg.checkpoint_count.
*/
static void
-gc_run(void)
+gc_run_cleanup(void)
{
bool run_wal_gc = false;
bool run_engine_gc = false;
@@ -209,20 +209,20 @@ gc_run(void)
}
static int
-gc_fiber_f(va_list ap)
+gc_cleanup_fiber_f(va_list ap)
{
(void)ap;
while (!fiber_is_cancelled()) {
- int delta = gc.scheduled - gc.completed;
+ int delta = gc.cleanup_scheduled - gc.cleanup_completed;
if (delta == 0) {
/* No pending garbage collection. */
fiber_sleep(TIMEOUT_INFINITY);
continue;
}
assert(delta > 0);
- gc_run();
- gc.completed += delta;
- fiber_cond_signal(&gc.cond);
+ gc_run_cleanup();
+ gc.cleanup_completed += delta;
+ fiber_cond_signal(&gc.cleanup_cond);
}
return 0;
}
@@ -231,7 +231,7 @@ gc_fiber_f(va_list ap)
* Trigger asynchronous garbage collection.
*/
static void
-gc_schedule(void)
+gc_schedule_cleanup(void)
{
/*
* Do not wake up the background fiber if it's executing
@@ -241,8 +241,8 @@ gc_schedule(void)
* then - it will rerun garbage collection as soon as
* the current round completes.
*/
- if (gc.scheduled++ == gc.completed)
- fiber_wakeup(gc.fiber);
+ if (gc.cleanup_scheduled++ == gc.cleanup_completed)
+ fiber_wakeup(gc.cleanup_fiber);
}
/**
@@ -250,11 +250,11 @@ gc_schedule(void)
* to this point to complete.
*/
static void
-gc_wait(void)
+gc_wait_cleanup(void)
{
- unsigned scheduled = gc.scheduled;
- while (gc.completed < scheduled)
- fiber_cond_wait(&gc.cond);
+ unsigned scheduled = gc.cleanup_scheduled;
+ while (gc.cleanup_completed < scheduled)
+ fiber_cond_wait(&gc.cleanup_cond);
}
void
@@ -284,7 +284,7 @@ gc_advance(const struct vclock *vclock)
consumer = next;
}
- gc_schedule();
+ gc_schedule_cleanup();
}
void
@@ -305,7 +305,7 @@ gc_add_checkpoint(const struct vclock *vclock)
* Rerun the garbage collector in this case, just
* in case box.cfg.checkpoint_count has changed.
*/
- gc_schedule();
+ gc_schedule_cleanup();
return;
}
assert(last_checkpoint == NULL ||
@@ -324,7 +324,7 @@ gc_add_checkpoint(const struct vclock *vclock)
rlist_add_tail_entry(&gc.checkpoints, checkpoint, in_checkpoints);
gc.checkpoint_count++;
- gc_schedule();
+ gc_schedule_cleanup();
}
int
@@ -381,7 +381,7 @@ out:
* files have been removed.
*/
if (rc == 0)
- gc_wait();
+ gc_wait_cleanup();
return rc;
}
@@ -402,7 +402,7 @@ void
gc_unref_checkpoint(struct gc_checkpoint_ref *ref)
{
rlist_del_entry(ref, in_refs);
- gc_schedule();
+ gc_schedule_cleanup();
}
struct gc_consumer *
@@ -430,7 +430,7 @@ gc_consumer_unregister(struct gc_consumer *consumer)
{
if (!consumer->is_inactive) {
gc_tree_remove(&gc.consumers, consumer);
- gc_schedule();
+ gc_schedule_cleanup();
}
gc_consumer_delete(consumer);
}
@@ -464,7 +464,7 @@ gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
if (update_tree)
gc_tree_insert(&gc.consumers, consumer);
- gc_schedule();
+ gc_schedule_cleanup();
}
struct gc_consumer *
diff --git a/src/box/gc.h b/src/box/gc.h
index 84a441f2..15927726 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -123,13 +123,13 @@ struct gc_state {
/** Registered consumers, linked by gc_consumer::node. */
gc_tree_t consumers;
/** Fiber that removes old files in the background. */
- struct fiber *fiber;
+ struct fiber *cleanup_fiber;
/**
- * Condition variable signaled by the background fiber
+ * Condition variable signaled by the cleanup fiber
* whenever it completes a round of garbage collection.
* Used to wait for garbage collection to complete.
*/
- struct fiber_cond cond;
+ struct fiber_cond cleanup_cond;
/**
* The following two members are used for scheduling
* background garbage collection and waiting for it to
@@ -141,7 +141,7 @@ struct gc_state {
* sleep until @completed reaches the value of @scheduled
* taken at that moment of time.
*/
- unsigned completed, scheduled;
+ unsigned cleanup_completed, cleanup_scheduled;
/**
* Set if there's a fiber making a checkpoint right now.
*/
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 07/10] Introduce checkpoint schedule module
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
` (5 preceding siblings ...)
2018-12-08 15:48 ` [PATCH v2 06/10] gc: some renames Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:45 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 08/10] Rewrite checkpoint daemon in C Vladimir Davydov
` (3 subsequent siblings)
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
This is a very simple module that incorporates the logic for calculating
the time of the next scheduled checkpoint given the configured interval
between checkpoints. It doesn't have any dependencies, which allows to
cover it with a unit test. It will be used by the checkpoint daemon once
we rewrite it in C. Rationale: in future we might want to introduce more
complex rules for scheduling checkpoints (cron-like may be) and it will
be really nice to have this logic neatly separated and tested.
---
src/box/CMakeLists.txt | 1 +
src/box/checkpoint_schedule.c | 76 ++++++++++++++++++++++++++++
src/box/checkpoint_schedule.h | 85 +++++++++++++++++++++++++++++++
test/unit/CMakeLists.txt | 6 +++
test/unit/checkpoint_schedule.c | 96 ++++++++++++++++++++++++++++++++++++
test/unit/checkpoint_schedule.result | 41 +++++++++++++++
6 files changed, 305 insertions(+)
create mode 100644 src/box/checkpoint_schedule.c
create mode 100644 src/box/checkpoint_schedule.h
create mode 100644 test/unit/checkpoint_schedule.c
create mode 100644 test/unit/checkpoint_schedule.result
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index d1276472..d7a52c5e 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -106,6 +106,7 @@ add_library(box STATIC
txn.c
box.cc
gc.c
+ checkpoint_schedule.c
user_def.c
user.cc
authentication.cc
diff --git a/src/box/checkpoint_schedule.c b/src/box/checkpoint_schedule.c
new file mode 100644
index 00000000..d37eba7f
--- /dev/null
+++ b/src/box/checkpoint_schedule.c
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "checkpoint_schedule.h"
+
+#include <assert.h>
+#include <math.h>
+#include <stdlib.h>
+
+void
+checkpoint_schedule_cfg(struct checkpoint_schedule *sched,
+ double now, double interval)
+{
+ sched->interval = interval;
+ sched->start_time = now + interval;
+
+ /*
+ * Add a random offset to the start time so as to avoid
+ * simultaneous checkpointing when multiple instances
+ * are running on the same host.
+ */
+ if (interval > 0)
+ sched->start_time += fmod(rand(), interval);
+}
+
+void
+checkpoint_schedule_reset(struct checkpoint_schedule *sched, double now)
+{
+ sched->start_time = now + sched->interval;
+}
+
+double
+checkpoint_schedule_timeout(struct checkpoint_schedule *sched, double now)
+{
+ if (sched->interval <= 0)
+ return 0; /* checkpointing disabled */
+
+ if (now < sched->start_time)
+ return sched->start_time - now;
+
+ /* Time elapsed since the last checkpoint. */
+ double elapsed = fmod(now - sched->start_time, sched->interval);
+
+ /* Time left to the next checkpoint. */
+ double timeout = sched->interval - elapsed;
+
+ assert(timeout > 0);
+ return timeout;
+}
diff --git a/src/box/checkpoint_schedule.h b/src/box/checkpoint_schedule.h
new file mode 100644
index 00000000..7fbbfe2f
--- /dev/null
+++ b/src/box/checkpoint_schedule.h
@@ -0,0 +1,85 @@
+#ifndef TARANTOOL_BOX_CHECKPOINT_SCHEDULE_H_INCLUDED
+#define TARANTOOL_BOX_CHECKPOINT_SCHEDULE_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct checkpoint_schedule {
+ /**
+ * Configured interval between checkpoints, in seconds.
+ * Set to 0 if periodic checkpointing is disabled.
+ */
+ double interval;
+ /**
+ * Time of the first scheduled checkpoint. It is used
+ * for calculating times of all subsequent checkpoints.
+ */
+ double start_time;
+};
+
+/**
+ * (Re)configure a checkpoint schedule.
+ *
+ * @now is the current time.
+ * @interval is the configured interval between checkpoints.
+ */
+void
+checkpoint_schedule_cfg(struct checkpoint_schedule *sched,
+ double now, double interval);
+
+/**
+ * Reset a checkpoint schedule.
+ *
+ * Called when a checkpoint is triggered out of the schedule.
+ * Used to adjusts the schedule accordingly.
+ *
+ * @now is the current time.
+ */
+void
+checkpoint_schedule_reset(struct checkpoint_schedule *sched, double now);
+
+/**
+ * Return the time to the next scheduled checkpoint, in seconds.
+ * If auto checkpointing is disabled, returns 0.
+ *
+ * @now is the current time.
+ */
+double
+checkpoint_schedule_timeout(struct checkpoint_schedule *sched, double now);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_CHECKPOINT_SCHEDULE_H_INCLUDED */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index aef53160..aac86f9e 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -195,3 +195,9 @@ target_link_libraries(coll.test core unit ${ICU_LIBRARIES} misc)
add_executable(tuple_bigref.test tuple_bigref.c)
target_link_libraries(tuple_bigref.test tuple unit)
+
+add_executable(checkpoint_schedule.test
+ checkpoint_schedule.c
+ ${PROJECT_SOURCE_DIR}/src/box/checkpoint_schedule.c
+)
+target_link_libraries(checkpoint_schedule.test m unit)
diff --git a/test/unit/checkpoint_schedule.c b/test/unit/checkpoint_schedule.c
new file mode 100644
index 00000000..025c73b1
--- /dev/null
+++ b/test/unit/checkpoint_schedule.c
@@ -0,0 +1,96 @@
+#include <math.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdlib.h>
+#include <time.h>
+
+#include "unit.h"
+#include "checkpoint_schedule.h"
+
+static inline bool
+feq(double a, double b)
+{
+ return fabs(a - b) <= 1;
+}
+
+int
+main()
+{
+ header();
+ plan(38);
+
+ srand(time(NULL));
+ double now = rand();
+
+ struct checkpoint_schedule sched;
+ checkpoint_schedule_cfg(&sched, now, 0);
+
+ is(checkpoint_schedule_timeout(&sched, now), 0,
+ "checkpointing disabled - timeout after configuration");
+
+ now += rand();
+ is(checkpoint_schedule_timeout(&sched, now), 0,
+ "checkpointing disabled - timeout after sleep");
+
+ checkpoint_schedule_reset(&sched, now);
+ is(checkpoint_schedule_timeout(&sched, now), 0,
+ "checkpointing disabled - timeout after reset");
+
+ double intervals[] = { 100, 600, 1200, 1800, 3600, };
+ int intervals_len = sizeof(intervals) / sizeof(intervals[0]);
+ for (int i = 0; i < intervals_len; i++) {
+ double interval = intervals[i];
+
+ checkpoint_schedule_cfg(&sched, now, interval);
+ double t = checkpoint_schedule_timeout(&sched, now);
+ ok(t >= interval && t <= interval * 2,
+ "checkpoint interval %.0lf - timeout after configuration",
+ interval);
+
+ double t0;
+ for (int j = 0; j < 100; j++) {
+ checkpoint_schedule_cfg(&sched, now, interval);
+ t0 = checkpoint_schedule_timeout(&sched, now);
+ if (fabs(t - t0) > interval / 4)
+ break;
+ }
+ ok(fabs(t - t0) > interval / 4,
+ "checkpoint interval %.0lf - initial timeout randomization",
+ interval);
+
+ now += t0 / 2;
+ t = checkpoint_schedule_timeout(&sched, now);
+ ok(feq(t, t0 / 2),
+ "checkpoint interval %.0lf - timeout after sleep 1",
+ interval);
+
+ now += t0 / 2;
+ t = checkpoint_schedule_timeout(&sched, now);
+ ok(feq(t, interval),
+ "checkpoint interval %.0lf - timeout after sleep 2",
+ interval);
+
+ now += interval / 2;
+ t = checkpoint_schedule_timeout(&sched, now);
+ ok(feq(t, interval / 2),
+ "checkpoint interval %.0lf - timeout after sleep 3",
+ interval);
+
+ now += interval;
+ t = checkpoint_schedule_timeout(&sched, now);
+ ok(feq(t, interval / 2),
+ "checkpoint interval %.0lf - timeout after sleep 4",
+ interval);
+
+ checkpoint_schedule_reset(&sched, now);
+ t = checkpoint_schedule_timeout(&sched, now);
+ ok(feq(t, interval),
+ "checkpoint interval %.0lf - timeout after reset",
+ interval);
+ }
+
+ check_plan();
+ footer();
+
+ return 0;
+}
diff --git a/test/unit/checkpoint_schedule.result b/test/unit/checkpoint_schedule.result
new file mode 100644
index 00000000..e34c762a
--- /dev/null
+++ b/test/unit/checkpoint_schedule.result
@@ -0,0 +1,41 @@
+ *** main ***
+1..38
+ok 1 - checkpointing disabled - timeout after configuration
+ok 2 - checkpointing disabled - timeout after sleep
+ok 3 - checkpointing disabled - timeout after reset
+ok 4 - checkpoint interval 100 - timeout after configuration
+ok 5 - checkpoint interval 100 - initial timeout randomization
+ok 6 - checkpoint interval 100 - timeout after sleep 1
+ok 7 - checkpoint interval 100 - timeout after sleep 2
+ok 8 - checkpoint interval 100 - timeout after sleep 3
+ok 9 - checkpoint interval 100 - timeout after sleep 4
+ok 10 - checkpoint interval 100 - timeout after reset
+ok 11 - checkpoint interval 600 - timeout after configuration
+ok 12 - checkpoint interval 600 - initial timeout randomization
+ok 13 - checkpoint interval 600 - timeout after sleep 1
+ok 14 - checkpoint interval 600 - timeout after sleep 2
+ok 15 - checkpoint interval 600 - timeout after sleep 3
+ok 16 - checkpoint interval 600 - timeout after sleep 4
+ok 17 - checkpoint interval 600 - timeout after reset
+ok 18 - checkpoint interval 1200 - timeout after configuration
+ok 19 - checkpoint interval 1200 - initial timeout randomization
+ok 20 - checkpoint interval 1200 - timeout after sleep 1
+ok 21 - checkpoint interval 1200 - timeout after sleep 2
+ok 22 - checkpoint interval 1200 - timeout after sleep 3
+ok 23 - checkpoint interval 1200 - timeout after sleep 4
+ok 24 - checkpoint interval 1200 - timeout after reset
+ok 25 - checkpoint interval 1800 - timeout after configuration
+ok 26 - checkpoint interval 1800 - initial timeout randomization
+ok 27 - checkpoint interval 1800 - timeout after sleep 1
+ok 28 - checkpoint interval 1800 - timeout after sleep 2
+ok 29 - checkpoint interval 1800 - timeout after sleep 3
+ok 30 - checkpoint interval 1800 - timeout after sleep 4
+ok 31 - checkpoint interval 1800 - timeout after reset
+ok 32 - checkpoint interval 3600 - timeout after configuration
+ok 33 - checkpoint interval 3600 - initial timeout randomization
+ok 34 - checkpoint interval 3600 - timeout after sleep 1
+ok 35 - checkpoint interval 3600 - timeout after sleep 2
+ok 36 - checkpoint interval 3600 - timeout after sleep 3
+ok 37 - checkpoint interval 3600 - timeout after sleep 4
+ok 38 - checkpoint interval 3600 - timeout after reset
+ *** main: done ***
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 08/10] Rewrite checkpoint daemon in C
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
` (6 preceding siblings ...)
2018-12-08 15:48 ` [PATCH v2 07/10] Introduce checkpoint schedule module Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:45 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 09/10] wal: pass struct instead of vclock to checkpoint methods Vladimir Davydov
` (2 subsequent siblings)
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Long time ago, when the checkpoint daemon was added to Tarantool, it was
responsible not only for making periodic checkpoints, but also for
maintaining the configured number of checkpoints and removing old snap
and xlog files, so it was much easier to implement it in Lua than in C.
However, over time, all its responsibilities have been reimplemented in
C and moved to the server code so that now it just calls box.snapshot()
periodically. Let's rewrite this simple procedure in C as well - this
will allow us to easily add more complex logic there, e.g. triggering
checkpoint when WAL files exceed a configured threshold.
Note, this patch removes a few cases from xlog/checkpoint_daemon test
that tested the internal state of the checkpoint daemon, which isn't
available in Lua anymore. This is OK as those cases are covered by
unit/checkpoint_schedule test.
---
src/box/CMakeLists.txt | 1 -
src/box/box.cc | 7 ++
src/box/box.h | 1 +
src/box/gc.c | 105 ++++++++++++++++++++++---
src/box/gc.h | 12 +++
src/box/lua/cfg.cc | 12 +++
src/box/lua/checkpoint_daemon.lua | 136 ---------------------------------
src/box/lua/init.c | 2 -
src/box/lua/load_cfg.lua | 2 +-
test/xlog/checkpoint_daemon.result | 143 +++--------------------------------
test/xlog/checkpoint_daemon.test.lua | 61 ++-------------
11 files changed, 146 insertions(+), 336 deletions(-)
delete mode 100644 src/box/lua/checkpoint_daemon.lua
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index d7a52c5e..5260092f 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -7,7 +7,6 @@ lua_source(lua_sources lua/load_cfg.lua)
lua_source(lua_sources lua/schema.lua)
lua_source(lua_sources lua/tuple.lua)
lua_source(lua_sources lua/session.lua)
-lua_source(lua_sources lua/checkpoint_daemon.lua)
lua_source(lua_sources lua/feedback_daemon.lua)
lua_source(lua_sources lua/net_box.lua)
lua_source(lua_sources lua/upgrade.lua)
diff --git a/src/box/box.cc b/src/box/box.cc
index 121ad787..771f2b8c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -851,6 +851,13 @@ box_set_checkpoint_count(void)
}
void
+box_set_checkpoint_interval(void)
+{
+ double interval = cfg_getd("checkpoint_interval");
+ gc_set_checkpoint_interval(interval);
+}
+
+void
box_set_vinyl_memory(void)
{
struct vinyl_engine *vinyl;
diff --git a/src/box/box.h b/src/box/box.h
index 6de0691d..91e41a9d 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -194,6 +194,7 @@ void box_set_snap_io_rate_limit(void);
void box_set_too_long_threshold(void);
void box_set_readahead(void);
void box_set_checkpoint_count(void);
+void box_set_checkpoint_interval(void);
void box_set_memtx_memory(void);
void box_set_memtx_max_tuple_size(void);
void box_set_vinyl_memory(void);
diff --git a/src/box/gc.c b/src/box/gc.c
index e1b23eed..e8074078 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -39,10 +39,12 @@
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
+#include <time.h>
#define RB_COMPACT 1
#include <small/rb.h>
#include <small/rlist.h>
+#include <tarantool_ev.h>
#include "diag.h"
#include "errcode.h"
@@ -55,11 +57,14 @@
#include "schema.h"
#include "engine.h" /* engine_collect_garbage() */
#include "wal.h" /* wal_collect_garbage() */
+#include "checkpoint_schedule.h"
struct gc_state gc;
static int
gc_cleanup_fiber_f(va_list);
+static int
+gc_checkpoint_fiber_f(va_list);
/**
* Comparator used for ordering gc_consumer objects by signature
@@ -108,12 +113,19 @@ gc_init(void)
rlist_create(&gc.checkpoints);
gc_tree_new(&gc.consumers);
fiber_cond_create(&gc.cleanup_cond);
+ checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0);
gc.cleanup_fiber = fiber_new("gc", gc_cleanup_fiber_f);
if (gc.cleanup_fiber == NULL)
panic("failed to start garbage collection fiber");
+ gc.checkpoint_fiber = fiber_new("checkpoint_daemon",
+ gc_checkpoint_fiber_f);
+ if (gc.checkpoint_fiber == NULL)
+ panic("failed to start checkpoint daemon fiber");
+
fiber_start(gc.cleanup_fiber);
+ fiber_start(gc.checkpoint_fiber);
}
void
@@ -294,6 +306,18 @@ gc_set_min_checkpoint_count(int min_checkpoint_count)
}
void
+gc_set_checkpoint_interval(double interval)
+{
+ /*
+ * Reconfigure the schedule and wake up the checkpoint
+ * daemon so that it can readjust.
+ */
+ checkpoint_schedule_cfg(&gc.checkpoint_schedule,
+ ev_monotonic_now(loop()), interval);
+ fiber_wakeup(gc.checkpoint_fiber);
+}
+
+void
gc_add_checkpoint(const struct vclock *vclock)
{
struct gc_checkpoint *last_checkpoint = gc_last_checkpoint();
@@ -327,16 +351,13 @@ gc_add_checkpoint(const struct vclock *vclock)
gc_schedule_cleanup();
}
-int
-gc_checkpoint(void)
+static int
+gc_do_checkpoint(void)
{
int rc;
struct vclock vclock;
- if (gc.checkpoint_is_in_progress) {
- diag_set(ClientError, ER_CHECKPOINT_IN_PROGRESS);
- return -1;
- }
+ assert(!gc.checkpoint_is_in_progress);
gc.checkpoint_is_in_progress = true;
/*
@@ -371,6 +392,27 @@ out:
latch_unlock(&schema_lock);
gc.checkpoint_is_in_progress = false;
+ return rc;
+}
+
+int
+gc_checkpoint(void)
+{
+ if (gc.checkpoint_is_in_progress) {
+ diag_set(ClientError, ER_CHECKPOINT_IN_PROGRESS);
+ return -1;
+ }
+
+ /*
+ * Reset the schedule and wake up the checkpoint daemon
+ * so that it can readjust.
+ */
+ checkpoint_schedule_reset(&gc.checkpoint_schedule,
+ ev_monotonic_now(loop()));
+ fiber_wakeup(gc.checkpoint_fiber);
+
+ if (gc_do_checkpoint() != 0)
+ return -1;
/*
* Wait for background garbage collection that might
@@ -380,10 +422,55 @@ out:
* time box.snapshot() returns, all outdated checkpoint
* files have been removed.
*/
- if (rc == 0)
- gc_wait_cleanup();
+ gc_wait_cleanup();
+ return 0;
+}
- return rc;
+static int
+gc_checkpoint_fiber_f(va_list ap)
+{
+ (void)ap;
+
+ /*
+ * Make the fiber non-cancellable so as not to bother
+ * about spurious wakeups.
+ */
+ fiber_set_cancellable(false);
+
+ struct checkpoint_schedule *sched = &gc.checkpoint_schedule;
+ while (!fiber_is_cancelled()) {
+ double timeout = checkpoint_schedule_timeout(sched,
+ ev_monotonic_now(loop()));
+ if (timeout > 0) {
+ char buf[128];
+ struct tm tm;
+ time_t time = (time_t)(ev_now(loop()) + timeout);
+ localtime_r(&time, &tm);
+ strftime(buf, sizeof(buf), "%c", &tm);
+ say_info("scheduled next checkpoint for %s", buf);
+ } else {
+ /* Periodic checkpointing is disabled. */
+ timeout = TIMEOUT_INFINITY;
+ }
+ if (!fiber_yield_timeout(timeout)) {
+ /*
+ * The checkpoint schedule has changed.
+ * Reschedule the next checkpoint.
+ */
+ continue;
+ }
+ /* Time to make the next scheduled checkpoint. */
+ if (gc.checkpoint_is_in_progress) {
+ /*
+ * Another fiber is making a checkpoint.
+ * Skip this one.
+ */
+ continue;
+ }
+ if (gc_do_checkpoint() != 0)
+ diag_log();
+ }
+ return 0;
}
void
diff --git a/src/box/gc.h b/src/box/gc.h
index 15927726..ffbafd34 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -38,6 +38,7 @@
#include "fiber_cond.h"
#include "vclock.h"
#include "trivia/util.h"
+#include "checkpoint_schedule.h"
#if defined(__cplusplus)
extern "C" {
@@ -122,6 +123,10 @@ struct gc_state {
struct rlist checkpoints;
/** Registered consumers, linked by gc_consumer::node. */
gc_tree_t consumers;
+ /** Fiber responsible for periodic checkpointing. */
+ struct fiber *checkpoint_fiber;
+ /** Schedule of periodic checkpoints. */
+ struct checkpoint_schedule checkpoint_schedule;
/** Fiber that removes old files in the background. */
struct fiber *cleanup_fiber;
/**
@@ -215,6 +220,13 @@ void
gc_set_min_checkpoint_count(int min_checkpoint_count);
/**
+ * Set the time interval between checkpoints, in seconds.
+ * Setting the interval to 0 disables periodic checkpointing.
+ */
+void
+gc_set_checkpoint_interval(double interval);
+
+/**
* Track an existing checkpoint in the garbage collector state.
* Note, this function may trigger garbage collection to remove
* old checkpoints.
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index c3825591..4f08c78e 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -165,6 +165,17 @@ lbox_cfg_set_checkpoint_count(struct lua_State *L)
}
static int
+lbox_cfg_set_checkpoint_interval(struct lua_State *L)
+{
+ try {
+ box_set_checkpoint_interval();
+ } catch (Exception *) {
+ luaT_error(L);
+ }
+ return 0;
+}
+
+static int
lbox_cfg_set_read_only(struct lua_State *L)
{
try {
@@ -340,6 +351,7 @@ box_lua_cfg_init(struct lua_State *L)
{"cfg_set_too_long_threshold", lbox_cfg_set_too_long_threshold},
{"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit},
{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
+ {"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
{"cfg_set_read_only", lbox_cfg_set_read_only},
{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/checkpoint_daemon.lua b/src/box/lua/checkpoint_daemon.lua
deleted file mode 100644
index 576c4a5c..00000000
--- a/src/box/lua/checkpoint_daemon.lua
+++ /dev/null
@@ -1,136 +0,0 @@
--- checkpoint_daemon.lua (internal file)
-
-local log = require 'log'
-local fiber = require 'fiber'
-local fio = require 'fio'
-local yaml = require 'yaml'
-local errno = require 'errno'
-local digest = require 'digest'
-local pickle = require 'pickle'
-
-local PREFIX = 'checkpoint_daemon'
-
-local daemon = {
- checkpoint_interval = 0;
- fiber = nil;
- control = nil;
-}
-
--- create snapshot, return true if no errors
-local function snapshot()
- log.info("making snapshot...")
- local s, e = pcall(function() box.snapshot() end)
- if s then
- return true
- end
- -- don't complain in the log if the snapshot already exists
- if errno() == errno.EEXIST then
- return false
- end
- log.error("error while creating snapshot: %s", e)
- return false
-end
-
--- check filesystem and current time
-local function process(self)
-
- if daemon.checkpoint_interval == nil then
- return false
- end
-
- if not(daemon.checkpoint_interval > 0) then
- return false
- end
-
- local checkpoints = box.info.gc().checkpoints
- local last_checkpoint = checkpoints[#checkpoints]
-
- local last_snap = fio.pathjoin(box.cfg.memtx_dir,
- string.format('%020d.snap', last_checkpoint.signature))
- local snstat = fio.stat(last_snap)
- if snstat == nil then
- log.error("can't stat %s: %s", last_snap, errno.strerror())
- return false
- end
- if snstat.mtime + daemon.checkpoint_interval <= fiber.time() then
- return snapshot()
- end
-end
-
-local function daemon_fiber(self)
- fiber.name(PREFIX, {truncate = true})
- log.info("started")
-
- --
- -- Add random offset to the initial period to avoid simultaneous
- -- snapshotting when multiple instances of tarantool are running
- -- on the same host.
- -- See https://github.com/tarantool/tarantool/issues/732
- --
- local random = pickle.unpack('i', digest.urandom(4))
- local offset = random % self.checkpoint_interval
- while true do
- local period = self.checkpoint_interval + offset
- -- maintain next_snapshot_time as a self member for testing purposes
- self.next_snapshot_time = fiber.time() + period
- log.info("scheduled the next snapshot at %s",
- os.date("%c", self.next_snapshot_time))
- local msg = self.control:get(period)
- if msg == 'shutdown' then
- break
- elseif msg == 'reload' then
- offset = random % self.checkpoint_interval
- log.info("reloaded") -- continue
- elseif msg == nil and box.info.status == 'running' then
- local s, e = pcall(process, self)
- if not s then
- log.error(e)
- end
- offset = 0
- end
- end
- self.next_snapshot_time = nil
- log.info("stopped")
-end
-
-local function reload(self)
- if self.checkpoint_interval > 0 then
- if self.control == nil then
- -- Start daemon
- self.control = fiber.channel()
- self.fiber = fiber.create(daemon_fiber, self)
- fiber.sleep(0)
- else
- -- Reload daemon
- self.control:put("reload")
- --
- -- channel:put() doesn't block the writer if there
- -- is a ready reader. Give daemon fiber way so that
- -- it can execute before reload() returns to the caller.
- --
- fiber.sleep(0)
- end
- elseif self.control ~= nil then
- -- Shutdown daemon
- self.control:put("shutdown")
- self.fiber = nil
- self.control = nil
- fiber.sleep(0) -- see comment above
- end
-end
-
-setmetatable(daemon, {
- __index = {
- set_checkpoint_interval = function()
- daemon.checkpoint_interval = box.cfg.checkpoint_interval
- reload(daemon)
- return
- end,
- }
-})
-
-if box.internal == nil then
- box.internal = { [PREFIX] = daemon }
-else
- box.internal[PREFIX] = daemon
-end
diff --git a/src/box/lua/init.c b/src/box/lua/init.c
index ccb4c6a4..0e90f6be 100644
--- a/src/box/lua/init.c
+++ b/src/box/lua/init.c
@@ -65,7 +65,6 @@ extern char session_lua[],
schema_lua[],
load_cfg_lua[],
xlog_lua[],
- checkpoint_daemon_lua[],
feedback_daemon_lua[],
net_box_lua[],
upgrade_lua[],
@@ -75,7 +74,6 @@ static const char *lua_sources[] = {
"box/session", session_lua,
"box/tuple", tuple_lua,
"box/schema", schema_lua,
- "box/checkpoint_daemon", checkpoint_daemon_lua,
"box/feedback_daemon", feedback_daemon_lua,
"box/upgrade", upgrade_lua,
"box/net_box", net_box_lua,
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 38e742c8..321fd3ad 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -227,7 +227,7 @@ local dynamic_cfg = {
vinyl_cache = private.cfg_set_vinyl_cache,
vinyl_timeout = private.cfg_set_vinyl_timeout,
checkpoint_count = private.cfg_set_checkpoint_count,
- checkpoint_interval = private.checkpoint_daemon.set_checkpoint_interval,
+ checkpoint_interval = private.cfg_set_checkpoint_interval,
worker_pool_threads = private.cfg_set_worker_pool_threads,
feedback_enabled = private.feedback_daemon.set_feedback_params,
feedback_host = private.feedback_daemon.set_feedback_params,
diff --git a/test/xlog/checkpoint_daemon.result b/test/xlog/checkpoint_daemon.result
index 3a75137d..6c96da0d 100644
--- a/test/xlog/checkpoint_daemon.result
+++ b/test/xlog/checkpoint_daemon.result
@@ -16,6 +16,12 @@ test_run = env.new()
test_run:cleanup_cluster()
---
...
+default_checkpoint_count = box.cfg.checkpoint_count
+---
+...
+default_checkpoint_interval = box.cfg.checkpoint_interval
+---
+...
box.cfg{checkpoint_interval = 0}
---
...
@@ -144,47 +150,6 @@ test_run:cmd("setopt delimiter ''");
---
- true
...
--- restore default options
-box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 }
----
-...
-space:drop()
----
-...
-daemon = box.internal.checkpoint_daemon
----
-...
--- stop daemon
-box.cfg{ checkpoint_interval = 0 }
----
-...
--- wait daemon to stop
-while daemon.fiber ~= nil do fiber.sleep(0) end
----
-...
-daemon.fiber == nil
----
-- true
-...
--- start daemon
-box.cfg{ checkpoint_interval = 10 }
----
-...
-daemon.fiber ~= nil
----
-- true
-...
--- reload configuration
-box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 }
----
-...
-daemon.checkpoint_interval == 15
----
-- true
-...
-daemon.checkpoint_count = 20
----
-...
-- Check that checkpoint_count can't be < 1.
box.cfg{ checkpoint_count = 1 }
---
@@ -198,101 +163,13 @@ box.cfg.checkpoint_count
---
- 1
...
--- Start
-PERIOD = 3600
----
-...
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
----
-...
-snapshot_time, time = daemon.next_snapshot_time, fiber.time()
----
-...
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-daemon_fiber = daemon.fiber
----
-...
-daemon_control = daemon.control
----
-...
--- Reload #1
-PERIOD = 100
+-- Restore default options.
+box.cfg{checkpoint_count = default_checkpoint_count}
---
...
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
+box.cfg{checkpoint_interval = default_checkpoint_interval}
---
...
-snapshot_time, time = daemon.next_snapshot_time, fiber.time()
----
-...
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-daemon.fiber == daemon_fiber
----
-- true
-...
-daemon.control == daemon_control
----
-- true
-...
--- Reload #2
-PERIOD = 1000
----
-...
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
----
-...
-snapshot_time, time = daemon.next_snapshot_time, fiber.time()
----
-...
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-daemon.fiber == daemon_fiber
----
-- true
-...
-daemon.control == daemon_control
----
-- true
-...
-daemon_control = nil
----
-...
-daemin_fiber = nil
----
-...
--- Shutdown
-box.cfg{ checkpoint_count = 2, checkpoint_interval = 0}
----
-...
-daemon.next_snapshot_time
----
-- null
-...
-daemon.fiber == nil
----
-- true
-...
-daemon.control == nil
+space:drop()
---
-- true
...
diff --git a/test/xlog/checkpoint_daemon.test.lua b/test/xlog/checkpoint_daemon.test.lua
index f3490621..37d7f752 100644
--- a/test/xlog/checkpoint_daemon.test.lua
+++ b/test/xlog/checkpoint_daemon.test.lua
@@ -6,6 +6,8 @@ test_run = env.new()
test_run:cleanup_cluster()
+default_checkpoint_count = box.cfg.checkpoint_count
+default_checkpoint_interval = box.cfg.checkpoint_interval
box.cfg{checkpoint_interval = 0}
PERIOD = jit.os == 'Linux' and 0.03 or 1.5
@@ -85,62 +87,13 @@ test_run:wait_cond(function()
end, WAIT_COND_TIMEOUT);
test_run:cmd("setopt delimiter ''");
--- restore default options
-box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 }
-space:drop()
-
-daemon = box.internal.checkpoint_daemon
--- stop daemon
-box.cfg{ checkpoint_interval = 0 }
--- wait daemon to stop
-while daemon.fiber ~= nil do fiber.sleep(0) end
-daemon.fiber == nil
--- start daemon
-box.cfg{ checkpoint_interval = 10 }
-daemon.fiber ~= nil
--- reload configuration
-box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 }
-daemon.checkpoint_interval == 15
-daemon.checkpoint_count = 20
-
-- Check that checkpoint_count can't be < 1.
box.cfg{ checkpoint_count = 1 }
box.cfg{ checkpoint_count = 0 }
box.cfg.checkpoint_count
--- Start
-PERIOD = 3600
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
-snapshot_time, time = daemon.next_snapshot_time, fiber.time()
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
-
-daemon_fiber = daemon.fiber
-daemon_control = daemon.control
-
--- Reload #1
-PERIOD = 100
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
-snapshot_time, time = daemon.next_snapshot_time, fiber.time()
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
-daemon.fiber == daemon_fiber
-daemon.control == daemon_control
-
--- Reload #2
-PERIOD = 1000
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
-snapshot_time, time = daemon.next_snapshot_time, fiber.time()
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
-daemon.fiber == daemon_fiber
-daemon.control == daemon_control
-
-daemon_control = nil
-daemin_fiber = nil
-
--- Shutdown
-box.cfg{ checkpoint_count = 2, checkpoint_interval = 0}
-daemon.next_snapshot_time
-daemon.fiber == nil
-daemon.control == nil
+-- Restore default options.
+box.cfg{checkpoint_count = default_checkpoint_count}
+box.cfg{checkpoint_interval = default_checkpoint_interval}
+
+space:drop()
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 09/10] wal: pass struct instead of vclock to checkpoint methods
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
` (7 preceding siblings ...)
2018-12-08 15:48 ` [PATCH v2 08/10] Rewrite checkpoint daemon in C Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:46 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 10/10] wal: trigger checkpoint if there are too many WALs Vladimir Davydov
2018-12-09 11:20 ` [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Currently, we only need to pass a vclock between TX and WAL during
checkpointing. However, in order to implement auto-checkpointing
triggered when WAL size exceeds a certain threshold, we will need to
pass some extra info so that we can properly reset the counter
accounting the WAL size in the WAL thread. To make it possible, let's
move wal_checkpoint struct, which is used internally by WAL to pass a
checkpoint vclock, to the header and require the caller to pass it to
wal_begin/commit_checkpoint instead of just a vclock.
---
src/box/gc.c | 10 +++++-----
src/box/wal.c | 22 ++++++----------------
src/box/wal.h | 20 +++++++++++++++-----
3 files changed, 26 insertions(+), 26 deletions(-)
diff --git a/src/box/gc.c b/src/box/gc.c
index e8074078..6a7e371f 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -355,7 +355,7 @@ static int
gc_do_checkpoint(void)
{
int rc;
- struct vclock vclock;
+ struct wal_checkpoint checkpoint;
assert(!gc.checkpoint_is_in_progress);
gc.checkpoint_is_in_progress = true;
@@ -373,19 +373,19 @@ gc_do_checkpoint(void)
rc = engine_begin_checkpoint();
if (rc != 0)
goto out;
- rc = wal_begin_checkpoint(&vclock);
+ rc = wal_begin_checkpoint(&checkpoint);
if (rc != 0)
goto out;
- rc = engine_commit_checkpoint(&vclock);
+ rc = engine_commit_checkpoint(&checkpoint.vclock);
if (rc != 0)
goto out;
- wal_commit_checkpoint(&vclock);
+ wal_commit_checkpoint(&checkpoint);
/*
* Finally, track the newly created checkpoint in the garbage
* collector state.
*/
- gc_add_checkpoint(&vclock);
+ gc_add_checkpoint(&checkpoint.vclock);
out:
if (rc != 0)
engine_abort_checkpoint();
diff --git a/src/box/wal.c b/src/box/wal.c
index 644f58c8..8e56e6ae 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -497,12 +497,6 @@ wal_sync(void)
cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL);
}
-struct wal_checkpoint
-{
- struct cbus_call_msg base;
- struct vclock vclock;
-};
-
static int
wal_begin_checkpoint_f(struct cbus_call_msg *data)
{
@@ -534,11 +528,11 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data)
}
int
-wal_begin_checkpoint(struct vclock *vclock)
+wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
{
struct wal_writer *writer = &wal_writer_singleton;
if (writer->wal_mode == WAL_NONE) {
- vclock_copy(vclock, &writer->vclock);
+ vclock_copy(&checkpoint->vclock, &writer->vclock);
return 0;
}
if (!stailq_empty(&writer->rollback)) {
@@ -552,15 +546,13 @@ wal_begin_checkpoint(struct vclock *vclock)
diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
return -1;
}
- struct wal_checkpoint msg;
bool cancellable = fiber_set_cancellable(false);
int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
- &msg.base, wal_begin_checkpoint_f, NULL,
+ &checkpoint->base, wal_begin_checkpoint_f, NULL,
TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
if (rc != 0)
return -1;
- vclock_copy(vclock, &msg.vclock);
return 0;
}
@@ -574,18 +566,16 @@ wal_commit_checkpoint_f(struct cbus_call_msg *data)
}
void
-wal_commit_checkpoint(const struct vclock *vclock)
+wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
{
struct wal_writer *writer = &wal_writer_singleton;
if (writer->wal_mode == WAL_NONE) {
- vclock_copy(&writer->checkpoint_vclock, vclock);
+ vclock_copy(&writer->checkpoint_vclock, &checkpoint->vclock);
return;
}
- struct wal_checkpoint msg;
- vclock_copy(&msg.vclock, vclock);
bool cancellable = fiber_set_cancellable(false);
cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
- &msg.base, wal_commit_checkpoint_f, NULL,
+ &checkpoint->base, wal_commit_checkpoint_f, NULL,
TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
}
diff --git a/src/box/wal.h b/src/box/wal.h
index 3d616353..2564f718 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -160,16 +160,26 @@ wal_mode();
void
wal_sync(void);
+struct wal_checkpoint {
+ struct cbus_call_msg base;
+ /**
+ * VClock of the last record written to the rotated WAL.
+ * This is the vclock that is supposed to be used to
+ * identify the new checkpoint.
+ */
+ struct vclock vclock;
+};
+
/**
* Prepare WAL for checkpointing.
*
* This function flushes all pending changes and rotates the
- * current WAL. The vclock of the last record written to the
- * rotated WAL is returned in @vclock. This is the vclock that
- * is supposed to be used to identify the new checkpoint.
+ * current WAL. Checkpoint info is returned in @checkpoint.
+ * It is supposed to be passed to wal_commit_checkpoint()
+ * upon successful checkpoint creation.
*/
int
-wal_begin_checkpoint(struct vclock *vclock);
+wal_begin_checkpoint(struct wal_checkpoint *checkpoint);
/**
* This function is called upon successful checkpoint creation.
@@ -177,7 +187,7 @@ wal_begin_checkpoint(struct vclock *vclock);
* vclock.
*/
void
-wal_commit_checkpoint(const struct vclock *vclock);
+wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
/**
* Remove WAL files that are not needed by consumers reading
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH v2 10/10] wal: trigger checkpoint if there are too many WALs
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
` (8 preceding siblings ...)
2018-12-08 15:48 ` [PATCH v2 09/10] wal: pass struct instead of vclock to checkpoint methods Vladimir Davydov
@ 2018-12-08 15:48 ` Vladimir Davydov
2018-12-08 21:48 ` Konstantin Osipov
2018-12-09 11:20 ` [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
10 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-08 15:48 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Closes #1082
@TarantoolBot document
Title: Document box.cfg.checkpoint_wal_threshold
Tarantool makes checkpoints every box.cfg.checkpoint_interval seconds
and keeps last box.cfg.checkpoint_count checkpoints. It also keeps all
intermediate WAL files. Currently, it isn't possible to set a checkpoint
trigger based on the sum size of WAL files, which makes it difficult to
estimate the minimal amount of disk space that needs to be allotted to a
Tarantool instance for storing WALs to eliminate the possibility of
ENOSPC errors. For example, under normal conditions a Tarantool instance
may write 1 GB of WAL files every box.cfg.checkpoint_interval seconds
and so one may assume that 1 GB times box.cfg.checkpoint_count should be
enough for the WAL partition, but there's no guarantee it won't write 10
GB between checkpoints when the load is extreme.
So we've agreed that we must provide users with one more configuration
option that could be used to impose the limit on the sum size of WAL
files. The new option is called box.cfg.checkpoint_wal_threshold. Once
the configured threshold is exceeded, the WAL thread notifies the
checkpoint daemon that it's time to make a new checkpoint and delete
old WAL files. Note, the new option only limits the size of WAL files
created since the last checkpoint, because backup WAL files are not
needed for recovery and can be deleted in case of emergency ENOSPC, for
more details see tarantool/tarantool#1082, tarantool/tarantool#3397,
tarantool/tarantool#3822.
The default value of the new option is 1 exabyte (10^18 byte), which
actually means that the feature is disabled.
---
src/box/box.cc | 17 ++++-
src/box/box.h | 1 +
src/box/gc.c | 16 ++++-
src/box/gc.h | 13 ++++
src/box/lua/cfg.cc | 12 ++++
src/box/lua/load_cfg.lua | 3 +
src/box/wal.c | 118 ++++++++++++++++++++++++++++++--
src/box/wal.h | 23 ++++++-
test/app-tap/init_script.result | 87 +++++++++++------------
test/box/admin.result | 2 +
test/box/cfg.result | 4 ++
test/xlog/checkpoint_threshold.result | 115 +++++++++++++++++++++++++++++++
test/xlog/checkpoint_threshold.test.lua | 63 +++++++++++++++++
test/xlog/suite.ini | 2 +-
14 files changed, 423 insertions(+), 53 deletions(-)
create mode 100644 test/xlog/checkpoint_threshold.result
create mode 100644 test/xlog/checkpoint_threshold.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index 771f2b8c..9f2fd6da 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -858,6 +858,13 @@ box_set_checkpoint_interval(void)
}
void
+box_set_checkpoint_wal_threshold(void)
+{
+ int64_t threshold = cfg_geti64("checkpoint_wal_threshold");
+ wal_set_checkpoint_threshold(threshold);
+}
+
+void
box_set_vinyl_memory(void)
{
struct vinyl_engine *vinyl;
@@ -2023,6 +2030,13 @@ on_wal_garbage_collection(const struct vclock *vclock)
gc_advance(vclock);
}
+static void
+on_wal_checkpoint_threshold(void)
+{
+ say_info("WAL threshold exceeded, triggering checkpoint");
+ gc_trigger_checkpoint();
+}
+
void
box_init(void)
{
@@ -2136,7 +2150,8 @@ box_cfg_xc(void)
enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
- &checkpoint->vclock, on_wal_garbage_collection) != 0) {
+ &checkpoint->vclock, on_wal_garbage_collection,
+ on_wal_checkpoint_threshold) != 0) {
diag_raise();
}
diff --git a/src/box/box.h b/src/box/box.h
index 91e41a9d..6c6c319f 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -195,6 +195,7 @@ void box_set_too_long_threshold(void);
void box_set_readahead(void);
void box_set_checkpoint_count(void);
void box_set_checkpoint_interval(void);
+void box_set_checkpoint_wal_threshold(void);
void box_set_memtx_memory(void);
void box_set_memtx_max_tuple_size(void);
void box_set_vinyl_memory(void);
diff --git a/src/box/gc.c b/src/box/gc.c
index 6a7e371f..05503e68 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -426,6 +426,18 @@ gc_checkpoint(void)
return 0;
}
+void
+gc_trigger_checkpoint(void)
+{
+ if (gc.checkpoint_is_in_progress || gc.checkpoint_is_pending)
+ return;
+
+ gc.checkpoint_is_pending = true;
+ checkpoint_schedule_reset(&gc.checkpoint_schedule,
+ ev_monotonic_now(loop()));
+ fiber_wakeup(gc.checkpoint_fiber);
+}
+
static int
gc_checkpoint_fiber_f(va_list ap)
{
@@ -452,7 +464,8 @@ gc_checkpoint_fiber_f(va_list ap)
/* Periodic checkpointing is disabled. */
timeout = TIMEOUT_INFINITY;
}
- if (!fiber_yield_timeout(timeout)) {
+ if (!fiber_yield_timeout(timeout) &&
+ !gc.checkpoint_is_pending) {
/*
* The checkpoint schedule has changed.
* Reschedule the next checkpoint.
@@ -460,6 +473,7 @@ gc_checkpoint_fiber_f(va_list ap)
continue;
}
/* Time to make the next scheduled checkpoint. */
+ gc.checkpoint_is_pending = false;
if (gc.checkpoint_is_in_progress) {
/*
* Another fiber is making a checkpoint.
diff --git a/src/box/gc.h b/src/box/gc.h
index ffbafd34..5790ebcc 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -151,6 +151,11 @@ struct gc_state {
* Set if there's a fiber making a checkpoint right now.
*/
bool checkpoint_is_in_progress;
+ /**
+ * If this flag is set, the checkpoint daemon should create
+ * a checkpoint as soon as possible despite the schedule.
+ */
+ bool checkpoint_is_pending;
};
extern struct gc_state gc;
@@ -247,6 +252,14 @@ int
gc_checkpoint(void);
/**
+ * Trigger background checkpointing.
+ *
+ * The checkpoint will be created by the checkpoint daemon.
+ */
+void
+gc_trigger_checkpoint(void);
+
+/**
* Get a reference to @checkpoint and store it in @ref.
* This will block the garbage collector from deleting
* the checkpoint files until the reference is released
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 4f08c78e..4884ce01 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -176,6 +176,17 @@ lbox_cfg_set_checkpoint_interval(struct lua_State *L)
}
static int
+lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
+{
+ try {
+ box_set_checkpoint_wal_threshold();
+ } catch (Exception *) {
+ luaT_error(L);
+ }
+ return 0;
+}
+
+static int
lbox_cfg_set_read_only(struct lua_State *L)
{
try {
@@ -352,6 +363,7 @@ box_lua_cfg_init(struct lua_State *L)
{"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit},
{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
+ {"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
{"cfg_set_read_only", lbox_cfg_set_read_only},
{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 321fd3ad..6dc4a2af 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -68,6 +68,7 @@ local default_cfg = {
read_only = false,
hot_standby = false,
checkpoint_interval = 3600,
+ checkpoint_wal_threshold = 1e18,
checkpoint_count = 2,
worker_pool_threads = 4,
replication_timeout = 1,
@@ -128,6 +129,7 @@ local template_cfg = {
username = 'string',
coredump = 'boolean',
checkpoint_interval = 'number',
+ checkpoint_wal_threshold = 'number',
checkpoint_count = 'number',
read_only = 'boolean',
hot_standby = 'boolean',
@@ -228,6 +230,7 @@ local dynamic_cfg = {
vinyl_timeout = private.cfg_set_vinyl_timeout,
checkpoint_count = private.cfg_set_checkpoint_count,
checkpoint_interval = private.cfg_set_checkpoint_interval,
+ checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
worker_pool_threads = private.cfg_set_worker_pool_threads,
feedback_enabled = private.feedback_daemon.set_feedback_params,
feedback_host = private.feedback_daemon.set_feedback_params,
diff --git a/src/box/wal.c b/src/box/wal.c
index 8e56e6ae..3b50d362 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -92,6 +92,7 @@ struct wal_writer
struct journal base;
/* ----------------- tx ------------------- */
wal_on_garbage_collection_f on_garbage_collection;
+ wal_on_checkpoint_threshold_f on_checkpoint_threshold;
/**
* The rollback queue. An accumulator for all requests
* that need to be rolled back. Also acts as a valve
@@ -126,6 +127,23 @@ struct wal_writer
* recover from it even if it is running out of disk space.
*/
struct vclock checkpoint_vclock;
+ /** Total size of WAL files written since the last checkpoint. */
+ int64_t checkpoint_wal_size;
+ /**
+ * Checkpoint threshold: when the total size of WAL files
+ * written since the last checkpoint exceeds the value of
+ * this variable, the WAL thread will notify TX that it's
+ * time to trigger checkpointing.
+ */
+ int64_t checkpoint_threshold;
+ /**
+ * This flag is set if the WAL thread has notified TX that
+ * the checkpoint threshold has been exceeded. It is cleared
+ * on checkpoint completion. Needed in order not to invoke
+ * the TX callback over and over again while checkpointing
+ * is in progress.
+ */
+ bool checkpoint_triggered;
/** The current WAL file. */
struct xlog current_wal;
/**
@@ -309,6 +327,14 @@ tx_notify_gc(struct cmsg *msg)
free(msg);
}
+static void
+tx_notify_checkpoint(struct cmsg *msg)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ writer->on_checkpoint_threshold();
+ free(msg);
+}
+
/**
* Initialize WAL writer context. Even though it's a singleton,
* encapsulate the details just in case we may use
@@ -320,7 +346,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
const struct vclock *vclock,
const struct vclock *checkpoint_vclock,
- wal_on_garbage_collection_f on_garbage_collection)
+ wal_on_garbage_collection_f on_garbage_collection,
+ wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
writer->wal_mode = wal_mode;
writer->wal_max_rows = wal_max_rows;
@@ -336,11 +363,16 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
stailq_create(&writer->rollback);
cmsg_init(&writer->in_rollback, NULL);
+ writer->checkpoint_wal_size = 0;
+ writer->checkpoint_threshold = INT64_MAX;
+ writer->checkpoint_triggered = false;
+
vclock_copy(&writer->vclock, vclock);
vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
rlist_create(&writer->watchers);
writer->on_garbage_collection = on_garbage_collection;
+ writer->on_checkpoint_threshold = on_checkpoint_threshold;
}
/** Destroy a WAL writer structure. */
@@ -446,14 +478,16 @@ int
wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
const struct vclock *vclock, const struct vclock *checkpoint_vclock,
- wal_on_garbage_collection_f on_garbage_collection)
+ wal_on_garbage_collection_f on_garbage_collection,
+ wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
assert(wal_max_rows > 1);
struct wal_writer *writer = &wal_writer_singleton;
wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
wal_max_size, instance_uuid, vclock,
- checkpoint_vclock, on_garbage_collection);
+ checkpoint_vclock, on_garbage_collection,
+ on_checkpoint_threshold);
/*
* Scan the WAL directory to build an index of all
@@ -524,6 +558,7 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data)
*/
}
vclock_copy(&msg->vclock, &writer->vclock);
+ msg->wal_size = writer->checkpoint_wal_size;
return 0;
}
@@ -533,6 +568,7 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
struct wal_writer *writer = &wal_writer_singleton;
if (writer->wal_mode == WAL_NONE) {
vclock_copy(&checkpoint->vclock, &writer->vclock);
+ checkpoint->wal_size = 0;
return 0;
}
if (!stailq_empty(&writer->rollback)) {
@@ -561,7 +597,20 @@ wal_commit_checkpoint_f(struct cbus_call_msg *data)
{
struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
struct wal_writer *writer = &wal_writer_singleton;
+ /*
+ * Now, once checkpoint has been created, we can update
+ * the WAL's version of the last checkpoint vclock and
+ * reset the size of WAL files written since the last
+ * checkpoint. Note, since new WAL records may have been
+ * written while the checkpoint was created, we subtract
+ * the value of checkpoint_wal_size observed at the time
+ * when checkpointing started from the current value
+ * rather than just setting it to 0.
+ */
vclock_copy(&writer->checkpoint_vclock, &msg->vclock);
+ assert(writer->checkpoint_wal_size >= msg->wal_size);
+ writer->checkpoint_wal_size -= msg->wal_size;
+ writer->checkpoint_triggered = false;
return 0;
}
@@ -580,6 +629,36 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
fiber_set_cancellable(cancellable);
}
+struct wal_set_checkpoint_threshold_msg {
+ struct cbus_call_msg base;
+ int64_t checkpoint_threshold;
+};
+
+static int
+wal_set_checkpoint_threshold_f(struct cbus_call_msg *data)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct wal_set_checkpoint_threshold_msg *msg;
+ msg = (struct wal_set_checkpoint_threshold_msg *)data;
+ writer->checkpoint_threshold = msg->checkpoint_threshold;
+ return 0;
+}
+
+void
+wal_set_checkpoint_threshold(int64_t threshold)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ if (writer->wal_mode == WAL_NONE)
+ return;
+ struct wal_set_checkpoint_threshold_msg msg;
+ msg.checkpoint_threshold = threshold;
+ bool cancellable = fiber_set_cancellable(false);
+ cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+ &msg.base, wal_set_checkpoint_threshold_f, NULL,
+ TIMEOUT_INFINITY);
+ fiber_set_cancellable(cancellable);
+}
+
struct wal_gc_msg
{
struct cbus_call_msg base;
@@ -891,23 +970,50 @@ wal_write_to_disk(struct cmsg *msg)
/*
* Iterate over requests (transactions)
*/
+ int rc;
struct journal_entry *entry;
struct stailq_entry *last_committed = NULL;
stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
entry->res = vclock_sum(&writer->vclock);
- int rc = xlog_write_entry(l, entry);
+ rc = xlog_write_entry(l, entry);
if (rc < 0)
goto done;
- if (rc > 0)
+ if (rc > 0) {
+ writer->checkpoint_wal_size += rc;
last_committed = &entry->fifo;
+ }
/* rc == 0: the write is buffered in xlog_tx */
}
- if (xlog_flush(l) < 0)
+ rc = xlog_flush(l);
+ if (rc < 0)
goto done;
+ writer->checkpoint_wal_size += rc;
last_committed = stailq_last(&wal_msg->commit);
+ /*
+ * Notify TX if the checkpoint threshold has been exceeded.
+ * Use malloc() for allocating the notification message and
+ * don't panic on error, because if we fail to send the
+ * message now, we will retry next time we process a request.
+ */
+ if (!writer->checkpoint_triggered &&
+ writer->checkpoint_wal_size > writer->checkpoint_threshold) {
+ static struct cmsg_hop route[] = {
+ { tx_notify_checkpoint, NULL },
+ };
+ struct cmsg *msg = malloc(sizeof(*msg));
+ if (msg != NULL) {
+ cmsg_init(msg, route);
+ cpipe_push(&wal_thread.tx_prio_pipe, msg);
+ writer->checkpoint_triggered = true;
+ } else {
+ say_warn("failed to allocate checkpoint "
+ "notification message");
+ }
+ }
+
done:
error = diag_last_error(diag_get());
if (error) {
diff --git a/src/box/wal.h b/src/box/wal.h
index 2564f718..a9452f2b 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -61,6 +61,13 @@ extern "C" {
*/
typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
+/**
+ * Callback invoked in the TX thread when the total size of WAL
+ * files written since the last checkpoint exceeds the configured
+ * threshold.
+ */
+typedef void (*wal_on_checkpoint_threshold_f)(void);
+
void
wal_thread_start();
@@ -68,7 +75,8 @@ int
wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
const struct vclock *vclock, const struct vclock *checkpoint_vclock,
- wal_on_garbage_collection_f on_garbage_collection);
+ wal_on_garbage_collection_f on_garbage_collection,
+ wal_on_checkpoint_threshold_f on_checkpoint_threshold);
void
wal_thread_stop();
@@ -168,6 +176,12 @@ struct wal_checkpoint {
* identify the new checkpoint.
*/
struct vclock vclock;
+ /**
+ * Size of WAL files written since the last checkpoint.
+ * Used to reset the corresponding WAL counter upon
+ * successful checkpoint creation.
+ */
+ int64_t wal_size;
};
/**
@@ -190,6 +204,13 @@ void
wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
/**
+ * Set the WAL size threshold exceeding which will trigger
+ * checkpointing in TX.
+ */
+void
+wal_set_checkpoint_threshold(int64_t threshold);
+
+/**
* Remove WAL files that are not needed by consumers reading
* rows at @vclock or newer.
*/
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index b03e5159..70a4b258 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -6,49 +6,50 @@ box.cfg
1 background:false
2 checkpoint_count:2
3 checkpoint_interval:3600
-4 coredump:false
-5 feedback_enabled:true
-6 feedback_host:https://feedback.tarantool.io
-7 feedback_interval:3600
-8 force_recovery:false
-9 hot_standby:false
-10 listen:port
-11 log:tarantool.log
-12 log_format:plain
-13 log_level:5
-14 memtx_dir:.
-15 memtx_max_tuple_size:1048576
-16 memtx_memory:107374182
-17 memtx_min_tuple_size:16
-18 net_msg_max:768
-19 pid_file:box.pid
-20 read_only:false
-21 readahead:16320
-22 replication_connect_timeout:30
-23 replication_skip_conflict:false
-24 replication_sync_lag:10
-25 replication_sync_timeout:300
-26 replication_timeout:1
-27 rows_per_wal:500000
-28 slab_alloc_factor:1.05
-29 too_long_threshold:0.5
-30 vinyl_bloom_fpr:0.05
-31 vinyl_cache:134217728
-32 vinyl_dir:.
-33 vinyl_max_tuple_size:1048576
-34 vinyl_memory:134217728
-35 vinyl_page_size:8192
-36 vinyl_range_size:1073741824
-37 vinyl_read_threads:1
-38 vinyl_run_count_per_level:2
-39 vinyl_run_size_ratio:3.5
-40 vinyl_timeout:60
-41 vinyl_write_threads:4
-42 wal_dir:.
-43 wal_dir_rescan_delay:2
-44 wal_max_size:268435456
-45 wal_mode:write
-46 worker_pool_threads:4
+4 checkpoint_wal_threshold:1e+18
+5 coredump:false
+6 feedback_enabled:true
+7 feedback_host:https://feedback.tarantool.io
+8 feedback_interval:3600
+9 force_recovery:false
+10 hot_standby:false
+11 listen:port
+12 log:tarantool.log
+13 log_format:plain
+14 log_level:5
+15 memtx_dir:.
+16 memtx_max_tuple_size:1048576
+17 memtx_memory:107374182
+18 memtx_min_tuple_size:16
+19 net_msg_max:768
+20 pid_file:box.pid
+21 read_only:false
+22 readahead:16320
+23 replication_connect_timeout:30
+24 replication_skip_conflict:false
+25 replication_sync_lag:10
+26 replication_sync_timeout:300
+27 replication_timeout:1
+28 rows_per_wal:500000
+29 slab_alloc_factor:1.05
+30 too_long_threshold:0.5
+31 vinyl_bloom_fpr:0.05
+32 vinyl_cache:134217728
+33 vinyl_dir:.
+34 vinyl_max_tuple_size:1048576
+35 vinyl_memory:134217728
+36 vinyl_page_size:8192
+37 vinyl_range_size:1073741824
+38 vinyl_read_threads:1
+39 vinyl_run_count_per_level:2
+40 vinyl_run_size_ratio:3.5
+41 vinyl_timeout:60
+42 vinyl_write_threads:4
+43 wal_dir:.
+44 wal_dir_rescan_delay:2
+45 wal_max_size:268435456
+46 wal_mode:write
+47 worker_pool_threads:4
--
-- Test insert from detached fiber
--
diff --git a/test/box/admin.result b/test/box/admin.result
index 6da53f30..0b233889 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -32,6 +32,8 @@ cfg_filter(box.cfg)
- 2
- - checkpoint_interval
- 3600
+ - - checkpoint_wal_threshold
+ - 1000000000000000000
- - coredump
- false
- - feedback_enabled
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 01e6bc6b..68465669 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -20,6 +20,8 @@ cfg_filter(box.cfg)
- 2
- - checkpoint_interval
- 3600
+ - - checkpoint_wal_threshold
+ - 1000000000000000000
- - coredump
- false
- - feedback_enabled
@@ -119,6 +121,8 @@ cfg_filter(box.cfg)
- 2
- - checkpoint_interval
- 3600
+ - - checkpoint_wal_threshold
+ - 1000000000000000000
- - coredump
- false
- - feedback_enabled
diff --git a/test/xlog/checkpoint_threshold.result b/test/xlog/checkpoint_threshold.result
new file mode 100644
index 00000000..f1afec7c
--- /dev/null
+++ b/test/xlog/checkpoint_threshold.result
@@ -0,0 +1,115 @@
+test_run = require('test_run').new()
+---
+...
+fiber = require('fiber')
+---
+...
+digest = require('digest')
+---
+...
+default_threshold = box.cfg.checkpoint_wal_threshold
+---
+...
+threshold = 10 * 1024
+---
+...
+box.cfg{checkpoint_wal_threshold = threshold}
+---
+...
+s = box.schema.space.create('test')
+---
+...
+_ = s:create_index('pk')
+---
+...
+box.snapshot()
+---
+- ok
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function put(size)
+ s:auto_increment{digest.urandom(size)}
+end;
+---
+...
+function wait_checkpoint(signature)
+ signature = signature or box.info.signature
+ return test_run:wait_cond(function()
+ local checkpoints = box.info.gc().checkpoints
+ return signature == checkpoints[#checkpoints].signature
+ end, 10)
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+--
+-- Check that checkpointing is triggered automatically once
+-- the size of WAL files written since the last checkpoint
+-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082).
+--
+for i = 1, 3 do put(threshold / 3) end
+---
+...
+wait_checkpoint()
+---
+- true
+...
+for i = 1, 5 do put(threshold / 5) end
+---
+...
+wait_checkpoint()
+---
+- true
+...
+--
+-- Check that WAL rows written while a checkpoint was created
+-- are accounted as written after the checkpoint.
+--
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true)
+---
+- ok
+...
+-- This should trigger checkpointing, which will take quite
+-- a while due to the injected delay.
+for i = 1, 5 do put(threshold / 5) end
+---
+...
+fiber.sleep(0)
+---
+...
+-- Remember the future checkpoint signature.
+signature = box.info.signature
+---
+...
+-- Insert some records while the checkpoint is created.
+for i = 1, 4 do put(threshold / 5) end
+---
+...
+-- Disable the delay and wait for checkpointing to complete.
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false)
+---
+- ok
+...
+wait_checkpoint(signature)
+---
+- true
+...
+-- Check that insertion of one more record triggers another
+-- checkpoint, because it sums up with records written while
+-- the previous checkpoint was created.
+put(threshold / 5)
+---
+...
+wait_checkpoint()
+---
+- true
+...
+box.cfg{checkpoint_wal_threshold = default_threshold}
+---
+...
diff --git a/test/xlog/checkpoint_threshold.test.lua b/test/xlog/checkpoint_threshold.test.lua
new file mode 100644
index 00000000..cd55de09
--- /dev/null
+++ b/test/xlog/checkpoint_threshold.test.lua
@@ -0,0 +1,63 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+digest = require('digest')
+
+default_threshold = box.cfg.checkpoint_wal_threshold
+threshold = 10 * 1024
+box.cfg{checkpoint_wal_threshold = threshold}
+
+s = box.schema.space.create('test')
+_ = s:create_index('pk')
+box.snapshot()
+
+test_run:cmd("setopt delimiter ';'")
+function put(size)
+ s:auto_increment{digest.urandom(size)}
+end;
+function wait_checkpoint(signature)
+ signature = signature or box.info.signature
+ return test_run:wait_cond(function()
+ local checkpoints = box.info.gc().checkpoints
+ return signature == checkpoints[#checkpoints].signature
+ end, 10)
+end;
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Check that checkpointing is triggered automatically once
+-- the size of WAL files written since the last checkpoint
+-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082).
+--
+for i = 1, 3 do put(threshold / 3) end
+wait_checkpoint()
+for i = 1, 5 do put(threshold / 5) end
+wait_checkpoint()
+
+--
+-- Check that WAL rows written while a checkpoint was created
+-- are accounted as written after the checkpoint.
+--
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true)
+
+-- This should trigger checkpointing, which will take quite
+-- a while due to the injected delay.
+for i = 1, 5 do put(threshold / 5) end
+fiber.sleep(0)
+
+-- Remember the future checkpoint signature.
+signature = box.info.signature
+
+-- Insert some records while the checkpoint is created.
+for i = 1, 4 do put(threshold / 5) end
+
+-- Disable the delay and wait for checkpointing to complete.
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false)
+wait_checkpoint(signature)
+
+-- Check that insertion of one more record triggers another
+-- checkpoint, because it sums up with records written while
+-- the previous checkpoint was created.
+put(threshold / 5)
+wait_checkpoint()
+
+box.cfg{checkpoint_wal_threshold = default_threshold}
diff --git a/test/xlog/suite.ini b/test/xlog/suite.ini
index 4f82295d..4043f370 100644
--- a/test/xlog/suite.ini
+++ b/test/xlog/suite.ini
@@ -4,7 +4,7 @@ description = tarantool write ahead log tests
script = xlog.lua
disabled = snap_io_rate.test.lua upgrade.test.lua
valgrind_disabled =
-release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua
+release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua
config = suite.cfg
use_unix_sockets = True
long_run = snap_io_rate.test.lua
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 01/10] gc: do not use WAL watcher API for deactivating stale consumers
2018-12-08 15:48 ` [PATCH v2 01/10] gc: do not use WAL watcher API for deactivating stale consumers Vladimir Davydov
@ 2018-12-08 21:41 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:41 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> The WAL thread may delete old WAL files if it gets ENOSPC error.
> Currently, we use WAL watcher API to notify the TX thread about it so
> that it can shoot off stale replicas. This looks ugly, because WAL
> watcher API was initially designed to propagate WAL changes to relay
> threads and the new event WAL_EVENT_GC, which was introduced for
> notifying about ENOSPC-driven garbage collection, isn't used anywhere
> else. Besides, there's already a pipe from WAL to TX - we could reuse it
> instead of opening another one.
>
> If we followed down that path, then in order to trigger a checkpoint
> from the WAL thread (see #1082), we would have to introduce yet another
> esoteric WAL watcher event, making the whole design look even uglier.
> That said, let's rewrite the garbage collection notification procedure
> using a plane callback instead of abusing WAL watcher API.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 02/10] wal: simplify watcher API
2018-12-08 15:48 ` [PATCH v2 02/10] wal: simplify watcher API Vladimir Davydov
@ 2018-12-08 21:41 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:41 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> This patch reverts changes done in order to make WAL watcher API
> suitable for notiying TX about WAL garbage collection triggered on
> ENOSPC, namely:
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 03/10] box: fix certain cfg options initialized twice on recovery
2018-12-08 15:48 ` [PATCH v2 03/10] box: fix certain cfg options initialized twice on recovery Vladimir Davydov
@ 2018-12-08 21:42 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:42 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> Certain dynamic configuration options are initialized right in box.cc,
> because they are needed for recovery. All such options are supposed to
> be present in dynamic_cfg_skip_at_load table so that load_cfg.lua won't
> try to set them again upon recovery completion. However, not all of them
> happen to be there - sometime we simply forgot to patch this table after
> introduction of a new configuration option. This patch adds all the
> missing ones except checkpoint_count - there's no point to initialize
> checkpoint_count in box.cc so it removes it from box.cc instead.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 04/10] box: don't use box_checkpoint_is_in_progress outside box.cc
2018-12-08 15:48 ` [PATCH v2 04/10] box: don't use box_checkpoint_is_in_progress outside box.cc Vladimir Davydov
@ 2018-12-08 21:43 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:43 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> We only use box_checkpoint_is_in_progress in SIGUSR1 signal handler to
> print a warning in case checkpointing cannot be started, because it's
> already done by another fiber. Actually, it's not necessary to use it
> there - instead we can simply log the error returned by box_checkpoint,
> which will be self-explaining ER_CHECKPOINT_IN_PROGRESS in this case.
> So let's make box_checkpoint_is_in_progress private to box.cc - this
> will simplify moving the checkpoint daemon to the gc module.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 05/10] box: move checkpointing to gc module
2018-12-08 15:48 ` [PATCH v2 05/10] box: move checkpointing to gc module Vladimir Davydov
@ 2018-12-08 21:44 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:44 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> Garbage collection module seems to be the best way to accommodate the
> checkpoint daemon, but to move it there, we first need to move the code
> performing checkpoints there to avoid cross-dependency between box.cc
> and gc.c.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 06/10] gc: some renames
2018-12-08 15:48 ` [PATCH v2 06/10] gc: some renames Vladimir Davydov
@ 2018-12-08 21:44 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:44 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> GC module is responsible not only for garbage collection, but also for
> tracking consumers and making checkpoints. Soon it will also incorporate
> the checkpoint daemon. Let's prefix all members related to the cleanup
> procedure accordingly to avoid confusion.
> ---
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 07/10] Introduce checkpoint schedule module
2018-12-08 15:48 ` [PATCH v2 07/10] Introduce checkpoint schedule module Vladimir Davydov
@ 2018-12-08 21:45 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:45 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> This is a very simple module that incorporates the logic for calculating
> the time of the next scheduled checkpoint given the configured interval
> between checkpoints. It doesn't have any dependencies, which allows to
> cover it with a unit test. It will be used by the checkpoint daemon once
> we rewrite it in C. Rationale: in future we might want to introduce more
> complex rules for scheduling checkpoints (cron-like may be) and it will
> be really nice to have this logic neatly separated and tested.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 08/10] Rewrite checkpoint daemon in C
2018-12-08 15:48 ` [PATCH v2 08/10] Rewrite checkpoint daemon in C Vladimir Davydov
@ 2018-12-08 21:45 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:45 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> Long time ago, when the checkpoint daemon was added to Tarantool, it was
> responsible not only for making periodic checkpoints, but also for
> maintaining the configured number of checkpoints and removing old snap
> and xlog files, so it was much easier to implement it in Lua than in C.
> However, over time, all its responsibilities have been reimplemented in
> C and moved to the server code so that now it just calls box.snapshot()
> periodically. Let's rewrite this simple procedure in C as well - this
> will allow us to easily add more complex logic there, e.g. triggering
> checkpoint when WAL files exceed a configured threshold.
>
> Note, this patch removes a few cases from xlog/checkpoint_daemon test
> that tested the internal state of the checkpoint daemon, which isn't
> available in Lua anymore. This is OK as those cases are covered by
> unit/checkpoint_schedule test.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 09/10] wal: pass struct instead of vclock to checkpoint methods
2018-12-08 15:48 ` [PATCH v2 09/10] wal: pass struct instead of vclock to checkpoint methods Vladimir Davydov
@ 2018-12-08 21:46 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:46 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> Currently, we only need to pass a vclock between TX and WAL during
> checkpointing. However, in order to implement auto-checkpointing
> triggered when WAL size exceeds a certain threshold, we will need to
> pass some extra info so that we can properly reset the counter
> accounting the WAL size in the WAL thread. To make it possible, let's
> move wal_checkpoint struct, which is used internally by WAL to pass a
> checkpoint vclock, to the header and require the caller to pass it to
> wal_begin/commit_checkpoint instead of just a vclock.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 10/10] wal: trigger checkpoint if there are too many WALs
2018-12-08 15:48 ` [PATCH v2 10/10] wal: trigger checkpoint if there are too many WALs Vladimir Davydov
@ 2018-12-08 21:48 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-12-08 21:48 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/09 00:15]:
> Closes #1082
>
> @TarantoolBot document
> Title: Document box.cfg.checkpoint_wal_threshold
>
> Tarantool makes checkpoints every box.cfg.checkpoint_interval seconds
> and keeps last box.cfg.checkpoint_count checkpoints. It also keeps all
> intermediate WAL files. Currently, it isn't possible to set a checkpoint
> trigger based on the sum size of WAL files, which makes it difficult to
> estimate the minimal amount of disk space that needs to be allotted to a
> Tarantool instance for storing WALs to eliminate the possibility of
> ENOSPC errors. For example, under normal conditions a Tarantool instance
> may write 1 GB of WAL files every box.cfg.checkpoint_interval seconds
> and so one may assume that 1 GB times box.cfg.checkpoint_count should be
> enough for the WAL partition, but there's no guarantee it won't write 10
> GB between checkpoints when the load is extreme.
>
> So we've agreed that we must provide users with one more configuration
> option that could be used to impose the limit on the sum size of WAL
> files. The new option is called box.cfg.checkpoint_wal_threshold. Once
> the configured threshold is exceeded, the WAL thread notifies the
> checkpoint daemon that it's time to make a new checkpoint and delete
> old WAL files. Note, the new option only limits the size of WAL files
> created since the last checkpoint, because backup WAL files are not
> needed for recovery and can be deleted in case of emergency ENOSPC, for
> more details see tarantool/tarantool#1082, tarantool/tarantool#3397,
> tarantool/tarantool#3822.
>
> The default value of the new option is 1 exabyte (10^18 byte), which
> actually means that the feature is disabled.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [PATCH v2 00/10] Allow to limit size of WAL files
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
` (9 preceding siblings ...)
2018-12-08 15:48 ` [PATCH v2 10/10] wal: trigger checkpoint if there are too many WALs Vladimir Davydov
@ 2018-12-09 11:20 ` Vladimir Davydov
10 siblings, 0 replies; 22+ messages in thread
From: Vladimir Davydov @ 2018-12-09 11:20 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
On Sat, Dec 08, 2018 at 06:48:04PM +0300, Vladimir Davydov wrote:
> Vladimir Davydov (10):
> gc: do not use WAL watcher API for deactivating stale consumers
> wal: simplify watcher API
> box: fix certain cfg options initialized twice on recovery
> box: don't use box_checkpoint_is_in_progress outside box.cc
> box: move checkpointing to gc module
> gc: some renames
> Introduce checkpoint schedule module
> Rewrite checkpoint daemon in C
> wal: pass struct instead of vclock to checkpoint methods
> wal: trigger checkpoint if there are too many WALs
Pushed to 2.1.
^ permalink raw reply [flat|nested] 22+ messages in thread
end of thread, other threads:[~2018-12-09 11:20 UTC | newest]
Thread overview: 22+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-12-08 15:48 [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
2018-12-08 15:48 ` [PATCH v2 01/10] gc: do not use WAL watcher API for deactivating stale consumers Vladimir Davydov
2018-12-08 21:41 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 02/10] wal: simplify watcher API Vladimir Davydov
2018-12-08 21:41 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 03/10] box: fix certain cfg options initialized twice on recovery Vladimir Davydov
2018-12-08 21:42 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 04/10] box: don't use box_checkpoint_is_in_progress outside box.cc Vladimir Davydov
2018-12-08 21:43 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 05/10] box: move checkpointing to gc module Vladimir Davydov
2018-12-08 21:44 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 06/10] gc: some renames Vladimir Davydov
2018-12-08 21:44 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 07/10] Introduce checkpoint schedule module Vladimir Davydov
2018-12-08 21:45 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 08/10] Rewrite checkpoint daemon in C Vladimir Davydov
2018-12-08 21:45 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 09/10] wal: pass struct instead of vclock to checkpoint methods Vladimir Davydov
2018-12-08 21:46 ` Konstantin Osipov
2018-12-08 15:48 ` [PATCH v2 10/10] wal: trigger checkpoint if there are too many WALs Vladimir Davydov
2018-12-08 21:48 ` Konstantin Osipov
2018-12-09 11:20 ` [PATCH v2 00/10] Allow to limit size of WAL files Vladimir Davydov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox