[PATCH v2 10/10] wal: trigger checkpoint if there are too many WALs

Vladimir Davydov vdavydov.dev at gmail.com
Sat Dec 8 18:48:14 MSK 2018


Closes #1082

@TarantoolBot document
Title: Document box.cfg.checkpoint_wal_threshold

Tarantool makes checkpoints every box.cfg.checkpoint_interval seconds
and keeps last box.cfg.checkpoint_count checkpoints. It also keeps all
intermediate WAL files. Currently, it isn't possible to set a checkpoint
trigger based on the sum size of WAL files, which makes it difficult to
estimate the minimal amount of disk space that needs to be allotted to a
Tarantool instance for storing WALs to eliminate the possibility of
ENOSPC errors. For example, under normal conditions a Tarantool instance
may write 1 GB of WAL files every box.cfg.checkpoint_interval seconds
and so one may assume that 1 GB times box.cfg.checkpoint_count should be
enough for the WAL partition, but there's no guarantee it won't write 10
GB between checkpoints when the load is extreme.

So we've agreed that we must provide users with one more configuration
option that could be used to impose the limit on the sum size of WAL
files. The new option is called box.cfg.checkpoint_wal_threshold. Once
the configured threshold is exceeded, the WAL thread notifies the
checkpoint daemon that it's time to make a new checkpoint and delete
old WAL files. Note, the new option only limits the size of WAL files
created since the last checkpoint, because backup WAL files are not
needed for recovery and can be deleted in case of emergency ENOSPC, for
more details see tarantool/tarantool#1082, tarantool/tarantool#3397,
tarantool/tarantool#3822.

The default value of the new option is 1 exabyte (10^18 byte), which
actually means that the feature is disabled.
---
 src/box/box.cc                          |  17 ++++-
 src/box/box.h                           |   1 +
 src/box/gc.c                            |  16 ++++-
 src/box/gc.h                            |  13 ++++
 src/box/lua/cfg.cc                      |  12 ++++
 src/box/lua/load_cfg.lua                |   3 +
 src/box/wal.c                           | 118 ++++++++++++++++++++++++++++++--
 src/box/wal.h                           |  23 ++++++-
 test/app-tap/init_script.result         |  87 +++++++++++------------
 test/box/admin.result                   |   2 +
 test/box/cfg.result                     |   4 ++
 test/xlog/checkpoint_threshold.result   | 115 +++++++++++++++++++++++++++++++
 test/xlog/checkpoint_threshold.test.lua |  63 +++++++++++++++++
 test/xlog/suite.ini                     |   2 +-
 14 files changed, 423 insertions(+), 53 deletions(-)
 create mode 100644 test/xlog/checkpoint_threshold.result
 create mode 100644 test/xlog/checkpoint_threshold.test.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index 771f2b8c..9f2fd6da 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -858,6 +858,13 @@ box_set_checkpoint_interval(void)
 }
 
 void
+box_set_checkpoint_wal_threshold(void)
+{
+	int64_t threshold = cfg_geti64("checkpoint_wal_threshold");
+	wal_set_checkpoint_threshold(threshold);
+}
+
+void
 box_set_vinyl_memory(void)
 {
 	struct vinyl_engine *vinyl;
@@ -2023,6 +2030,13 @@ on_wal_garbage_collection(const struct vclock *vclock)
 	gc_advance(vclock);
 }
 
+static void
+on_wal_checkpoint_threshold(void)
+{
+	say_info("WAL threshold exceeded, triggering checkpoint");
+	gc_trigger_checkpoint();
+}
+
 void
 box_init(void)
 {
@@ -2136,7 +2150,8 @@ box_cfg_xc(void)
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
 		     wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
-		     &checkpoint->vclock, on_wal_garbage_collection) != 0) {
+		     &checkpoint->vclock, on_wal_garbage_collection,
+		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
 	}
 
diff --git a/src/box/box.h b/src/box/box.h
index 91e41a9d..6c6c319f 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -195,6 +195,7 @@ void box_set_too_long_threshold(void);
 void box_set_readahead(void);
 void box_set_checkpoint_count(void);
 void box_set_checkpoint_interval(void);
+void box_set_checkpoint_wal_threshold(void);
 void box_set_memtx_memory(void);
 void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_memory(void);
diff --git a/src/box/gc.c b/src/box/gc.c
index 6a7e371f..05503e68 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -426,6 +426,18 @@ gc_checkpoint(void)
 	return 0;
 }
 
+void
+gc_trigger_checkpoint(void)
+{
+	if (gc.checkpoint_is_in_progress || gc.checkpoint_is_pending)
+		return;
+
+	gc.checkpoint_is_pending = true;
+	checkpoint_schedule_reset(&gc.checkpoint_schedule,
+				  ev_monotonic_now(loop()));
+	fiber_wakeup(gc.checkpoint_fiber);
+}
+
 static int
 gc_checkpoint_fiber_f(va_list ap)
 {
@@ -452,7 +464,8 @@ gc_checkpoint_fiber_f(va_list ap)
 			/* Periodic checkpointing is disabled. */
 			timeout = TIMEOUT_INFINITY;
 		}
-		if (!fiber_yield_timeout(timeout)) {
+		if (!fiber_yield_timeout(timeout) &&
+		    !gc.checkpoint_is_pending) {
 			/*
 			 * The checkpoint schedule has changed.
 			 * Reschedule the next checkpoint.
@@ -460,6 +473,7 @@ gc_checkpoint_fiber_f(va_list ap)
 			continue;
 		}
 		/* Time to make the next scheduled checkpoint. */
+		gc.checkpoint_is_pending = false;
 		if (gc.checkpoint_is_in_progress) {
 			/*
 			 * Another fiber is making a checkpoint.
diff --git a/src/box/gc.h b/src/box/gc.h
index ffbafd34..5790ebcc 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -151,6 +151,11 @@ struct gc_state {
 	 * Set if there's a fiber making a checkpoint right now.
 	 */
 	bool checkpoint_is_in_progress;
+	/**
+	 * If this flag is set, the checkpoint daemon should create
+	 * a checkpoint as soon as possible despite the schedule.
+	 */
+	bool checkpoint_is_pending;
 };
 extern struct gc_state gc;
 
@@ -247,6 +252,14 @@ int
 gc_checkpoint(void);
 
 /**
+ * Trigger background checkpointing.
+ *
+ * The checkpoint will be created by the checkpoint daemon.
+ */
+void
+gc_trigger_checkpoint(void);
+
+/**
  * Get a reference to @checkpoint and store it in @ref.
  * This will block the garbage collector from deleting
  * the checkpoint files until the reference is released
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 4f08c78e..4884ce01 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -176,6 +176,17 @@ lbox_cfg_set_checkpoint_interval(struct lua_State *L)
 }
 
 static int
+lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
+{
+	try {
+		box_set_checkpoint_wal_threshold();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
+static int
 lbox_cfg_set_read_only(struct lua_State *L)
 {
 	try {
@@ -352,6 +363,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit},
 		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
 		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
+		{"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
 		{"cfg_set_read_only", lbox_cfg_set_read_only},
 		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
 		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 321fd3ad..6dc4a2af 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -68,6 +68,7 @@ local default_cfg = {
     read_only           = false,
     hot_standby         = false,
     checkpoint_interval = 3600,
+    checkpoint_wal_threshold = 1e18,
     checkpoint_count    = 2,
     worker_pool_threads = 4,
     replication_timeout = 1,
@@ -128,6 +129,7 @@ local template_cfg = {
     username            = 'string',
     coredump            = 'boolean',
     checkpoint_interval = 'number',
+    checkpoint_wal_threshold = 'number',
     checkpoint_count    = 'number',
     read_only           = 'boolean',
     hot_standby         = 'boolean',
@@ -228,6 +230,7 @@ local dynamic_cfg = {
     vinyl_timeout           = private.cfg_set_vinyl_timeout,
     checkpoint_count        = private.cfg_set_checkpoint_count,
     checkpoint_interval     = private.cfg_set_checkpoint_interval,
+    checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
     worker_pool_threads     = private.cfg_set_worker_pool_threads,
     feedback_enabled        = private.feedback_daemon.set_feedback_params,
     feedback_host           = private.feedback_daemon.set_feedback_params,
diff --git a/src/box/wal.c b/src/box/wal.c
index 8e56e6ae..3b50d362 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -92,6 +92,7 @@ struct wal_writer
 	struct journal base;
 	/* ----------------- tx ------------------- */
 	wal_on_garbage_collection_f on_garbage_collection;
+	wal_on_checkpoint_threshold_f on_checkpoint_threshold;
 	/**
 	 * The rollback queue. An accumulator for all requests
 	 * that need to be rolled back. Also acts as a valve
@@ -126,6 +127,23 @@ struct wal_writer
 	 * recover from it even if it is running out of disk space.
 	 */
 	struct vclock checkpoint_vclock;
+	/** Total size of WAL files written since the last checkpoint. */
+	int64_t checkpoint_wal_size;
+	/**
+	 * Checkpoint threshold: when the total size of WAL files
+	 * written since the last checkpoint exceeds the value of
+	 * this variable, the WAL thread will notify TX that it's
+	 * time to trigger checkpointing.
+	 */
+	int64_t checkpoint_threshold;
+	/**
+	 * This flag is set if the WAL thread has notified TX that
+	 * the checkpoint threshold has been exceeded. It is cleared
+	 * on checkpoint completion. Needed in order not to invoke
+	 * the TX callback over and over again while checkpointing
+	 * is in progress.
+	 */
+	bool checkpoint_triggered;
 	/** The current WAL file. */
 	struct xlog current_wal;
 	/**
@@ -309,6 +327,14 @@ tx_notify_gc(struct cmsg *msg)
 	free(msg);
 }
 
+static void
+tx_notify_checkpoint(struct cmsg *msg)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	writer->on_checkpoint_threshold();
+	free(msg);
+}
+
 /**
  * Initialize WAL writer context. Even though it's a singleton,
  * encapsulate the details just in case we may use
@@ -320,7 +346,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 		  const struct vclock *vclock,
 		  const struct vclock *checkpoint_vclock,
-		  wal_on_garbage_collection_f on_garbage_collection)
+		  wal_on_garbage_collection_f on_garbage_collection,
+		  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_rows = wal_max_rows;
@@ -336,11 +363,16 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	stailq_create(&writer->rollback);
 	cmsg_init(&writer->in_rollback, NULL);
 
+	writer->checkpoint_wal_size = 0;
+	writer->checkpoint_threshold = INT64_MAX;
+	writer->checkpoint_triggered = false;
+
 	vclock_copy(&writer->vclock, vclock);
 	vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
 	rlist_create(&writer->watchers);
 
 	writer->on_garbage_collection = on_garbage_collection;
+	writer->on_checkpoint_threshold = on_checkpoint_threshold;
 }
 
 /** Destroy a WAL writer structure. */
@@ -446,14 +478,16 @@ int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
-	 wal_on_garbage_collection_f on_garbage_collection)
+	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	assert(wal_max_rows > 1);
 
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
 			  wal_max_size, instance_uuid, vclock,
-			  checkpoint_vclock, on_garbage_collection);
+			  checkpoint_vclock, on_garbage_collection,
+			  on_checkpoint_threshold);
 
 	/*
 	 * Scan the WAL directory to build an index of all
@@ -524,6 +558,7 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data)
 		 */
 	}
 	vclock_copy(&msg->vclock, &writer->vclock);
+	msg->wal_size = writer->checkpoint_wal_size;
 	return 0;
 }
 
@@ -533,6 +568,7 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->wal_mode == WAL_NONE) {
 		vclock_copy(&checkpoint->vclock, &writer->vclock);
+		checkpoint->wal_size = 0;
 		return 0;
 	}
 	if (!stailq_empty(&writer->rollback)) {
@@ -561,7 +597,20 @@ wal_commit_checkpoint_f(struct cbus_call_msg *data)
 {
 	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
 	struct wal_writer *writer = &wal_writer_singleton;
+	/*
+	 * Now, once checkpoint has been created, we can update
+	 * the WAL's version of the last checkpoint vclock and
+	 * reset the size of WAL files written since the last
+	 * checkpoint. Note, since new WAL records may have been
+	 * written while the checkpoint was created, we subtract
+	 * the value of checkpoint_wal_size observed at the time
+	 * when checkpointing started from the current value
+	 * rather than just setting it to 0.
+	 */
 	vclock_copy(&writer->checkpoint_vclock, &msg->vclock);
+	assert(writer->checkpoint_wal_size >= msg->wal_size);
+	writer->checkpoint_wal_size -= msg->wal_size;
+	writer->checkpoint_triggered = false;
 	return 0;
 }
 
@@ -580,6 +629,36 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
 	fiber_set_cancellable(cancellable);
 }
 
+struct wal_set_checkpoint_threshold_msg {
+	struct cbus_call_msg base;
+	int64_t checkpoint_threshold;
+};
+
+static int
+wal_set_checkpoint_threshold_f(struct cbus_call_msg *data)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_set_checkpoint_threshold_msg *msg;
+	msg = (struct wal_set_checkpoint_threshold_msg *)data;
+	writer->checkpoint_threshold = msg->checkpoint_threshold;
+	return 0;
+}
+
+void
+wal_set_checkpoint_threshold(int64_t threshold)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	if (writer->wal_mode == WAL_NONE)
+		return;
+	struct wal_set_checkpoint_threshold_msg msg;
+	msg.checkpoint_threshold = threshold;
+	bool cancellable = fiber_set_cancellable(false);
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+		  &msg.base, wal_set_checkpoint_threshold_f, NULL,
+		  TIMEOUT_INFINITY);
+	fiber_set_cancellable(cancellable);
+}
+
 struct wal_gc_msg
 {
 	struct cbus_call_msg base;
@@ -891,23 +970,50 @@ wal_write_to_disk(struct cmsg *msg)
 	/*
 	 * Iterate over requests (transactions)
 	 */
+	int rc;
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
 		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(&writer->vclock);
-		int rc = xlog_write_entry(l, entry);
+		rc = xlog_write_entry(l, entry);
 		if (rc < 0)
 			goto done;
-		if (rc > 0)
+		if (rc > 0) {
+			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
+		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
-	if (xlog_flush(l) < 0)
+	rc = xlog_flush(l);
+	if (rc < 0)
 		goto done;
 
+	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 
+	/*
+	 * Notify TX if the checkpoint threshold has been exceeded.
+	 * Use malloc() for allocating the notification message and
+	 * don't panic on error, because if we fail to send the
+	 * message now, we will retry next time we process a request.
+	 */
+	if (!writer->checkpoint_triggered &&
+	    writer->checkpoint_wal_size > writer->checkpoint_threshold) {
+		static struct cmsg_hop route[] = {
+			{ tx_notify_checkpoint, NULL },
+		};
+		struct cmsg *msg = malloc(sizeof(*msg));
+		if (msg != NULL) {
+			cmsg_init(msg, route);
+			cpipe_push(&wal_thread.tx_prio_pipe, msg);
+			writer->checkpoint_triggered = true;
+		} else {
+			say_warn("failed to allocate checkpoint "
+				 "notification message");
+		}
+	}
+
 done:
 	error = diag_last_error(diag_get());
 	if (error) {
diff --git a/src/box/wal.h b/src/box/wal.h
index 2564f718..a9452f2b 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -61,6 +61,13 @@ extern "C" {
  */
 typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
 
+/**
+ * Callback invoked in the TX thread when the total size of WAL
+ * files written since the last checkpoint exceeds the configured
+ * threshold.
+ */
+typedef void (*wal_on_checkpoint_threshold_f)(void);
+
 void
 wal_thread_start();
 
@@ -68,7 +75,8 @@ int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
-	 wal_on_garbage_collection_f on_garbage_collection);
+	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
 void
 wal_thread_stop();
@@ -168,6 +176,12 @@ struct wal_checkpoint {
 	 * identify the new checkpoint.
 	 */
 	struct vclock vclock;
+	/**
+	 * Size of WAL files written since the last checkpoint.
+	 * Used to reset the corresponding WAL counter upon
+	 * successful checkpoint creation.
+	 */
+	int64_t wal_size;
 };
 
 /**
@@ -190,6 +204,13 @@ void
 wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 
 /**
+ * Set the WAL size threshold exceeding which will trigger
+ * checkpointing in TX.
+ */
+void
+wal_set_checkpoint_threshold(int64_t threshold);
+
+/**
  * Remove WAL files that are not needed by consumers reading
  * rows at @vclock or newer.
  */
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index b03e5159..70a4b258 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -6,49 +6,50 @@ box.cfg
 1	background:false
 2	checkpoint_count:2
 3	checkpoint_interval:3600
-4	coredump:false
-5	feedback_enabled:true
-6	feedback_host:https://feedback.tarantool.io
-7	feedback_interval:3600
-8	force_recovery:false
-9	hot_standby:false
-10	listen:port
-11	log:tarantool.log
-12	log_format:plain
-13	log_level:5
-14	memtx_dir:.
-15	memtx_max_tuple_size:1048576
-16	memtx_memory:107374182
-17	memtx_min_tuple_size:16
-18	net_msg_max:768
-19	pid_file:box.pid
-20	read_only:false
-21	readahead:16320
-22	replication_connect_timeout:30
-23	replication_skip_conflict:false
-24	replication_sync_lag:10
-25	replication_sync_timeout:300
-26	replication_timeout:1
-27	rows_per_wal:500000
-28	slab_alloc_factor:1.05
-29	too_long_threshold:0.5
-30	vinyl_bloom_fpr:0.05
-31	vinyl_cache:134217728
-32	vinyl_dir:.
-33	vinyl_max_tuple_size:1048576
-34	vinyl_memory:134217728
-35	vinyl_page_size:8192
-36	vinyl_range_size:1073741824
-37	vinyl_read_threads:1
-38	vinyl_run_count_per_level:2
-39	vinyl_run_size_ratio:3.5
-40	vinyl_timeout:60
-41	vinyl_write_threads:4
-42	wal_dir:.
-43	wal_dir_rescan_delay:2
-44	wal_max_size:268435456
-45	wal_mode:write
-46	worker_pool_threads:4
+4	checkpoint_wal_threshold:1e+18
+5	coredump:false
+6	feedback_enabled:true
+7	feedback_host:https://feedback.tarantool.io
+8	feedback_interval:3600
+9	force_recovery:false
+10	hot_standby:false
+11	listen:port
+12	log:tarantool.log
+13	log_format:plain
+14	log_level:5
+15	memtx_dir:.
+16	memtx_max_tuple_size:1048576
+17	memtx_memory:107374182
+18	memtx_min_tuple_size:16
+19	net_msg_max:768
+20	pid_file:box.pid
+21	read_only:false
+22	readahead:16320
+23	replication_connect_timeout:30
+24	replication_skip_conflict:false
+25	replication_sync_lag:10
+26	replication_sync_timeout:300
+27	replication_timeout:1
+28	rows_per_wal:500000
+29	slab_alloc_factor:1.05
+30	too_long_threshold:0.5
+31	vinyl_bloom_fpr:0.05
+32	vinyl_cache:134217728
+33	vinyl_dir:.
+34	vinyl_max_tuple_size:1048576
+35	vinyl_memory:134217728
+36	vinyl_page_size:8192
+37	vinyl_range_size:1073741824
+38	vinyl_read_threads:1
+39	vinyl_run_count_per_level:2
+40	vinyl_run_size_ratio:3.5
+41	vinyl_timeout:60
+42	vinyl_write_threads:4
+43	wal_dir:.
+44	wal_dir_rescan_delay:2
+45	wal_max_size:268435456
+46	wal_mode:write
+47	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 6da53f30..0b233889 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -32,6 +32,8 @@ cfg_filter(box.cfg)
     - 2
   - - checkpoint_interval
     - 3600
+  - - checkpoint_wal_threshold
+    - 1000000000000000000
   - - coredump
     - false
   - - feedback_enabled
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 01e6bc6b..68465669 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -20,6 +20,8 @@ cfg_filter(box.cfg)
     - 2
   - - checkpoint_interval
     - 3600
+  - - checkpoint_wal_threshold
+    - 1000000000000000000
   - - coredump
     - false
   - - feedback_enabled
@@ -119,6 +121,8 @@ cfg_filter(box.cfg)
     - 2
   - - checkpoint_interval
     - 3600
+  - - checkpoint_wal_threshold
+    - 1000000000000000000
   - - coredump
     - false
   - - feedback_enabled
diff --git a/test/xlog/checkpoint_threshold.result b/test/xlog/checkpoint_threshold.result
new file mode 100644
index 00000000..f1afec7c
--- /dev/null
+++ b/test/xlog/checkpoint_threshold.result
@@ -0,0 +1,115 @@
+test_run = require('test_run').new()
+---
+...
+fiber = require('fiber')
+---
+...
+digest = require('digest')
+---
+...
+default_threshold = box.cfg.checkpoint_wal_threshold
+---
+...
+threshold = 10 * 1024
+---
+...
+box.cfg{checkpoint_wal_threshold = threshold}
+---
+...
+s = box.schema.space.create('test')
+---
+...
+_ = s:create_index('pk')
+---
+...
+box.snapshot()
+---
+- ok
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function put(size)
+    s:auto_increment{digest.urandom(size)}
+end;
+---
+...
+function wait_checkpoint(signature)
+    signature = signature or box.info.signature
+    return test_run:wait_cond(function()
+        local checkpoints = box.info.gc().checkpoints
+        return signature == checkpoints[#checkpoints].signature
+    end, 10)
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+--
+-- Check that checkpointing is triggered automatically once
+-- the size of WAL files written since the last checkpoint
+-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082).
+--
+for i = 1, 3 do put(threshold / 3) end
+---
+...
+wait_checkpoint()
+---
+- true
+...
+for i = 1, 5 do put(threshold / 5) end
+---
+...
+wait_checkpoint()
+---
+- true
+...
+--
+-- Check that WAL rows written while a checkpoint was created
+-- are accounted as written after the checkpoint.
+--
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true)
+---
+- ok
+...
+-- This should trigger checkpointing, which will take quite
+-- a while due to the injected delay.
+for i = 1, 5 do put(threshold / 5) end
+---
+...
+fiber.sleep(0)
+---
+...
+-- Remember the future checkpoint signature.
+signature = box.info.signature
+---
+...
+-- Insert some records while the checkpoint is created.
+for i = 1, 4 do put(threshold / 5) end
+---
+...
+-- Disable the delay and wait for checkpointing to complete.
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false)
+---
+- ok
+...
+wait_checkpoint(signature)
+---
+- true
+...
+-- Check that insertion of one more record triggers another
+-- checkpoint, because it sums up with records written while
+-- the previous checkpoint was created.
+put(threshold / 5)
+---
+...
+wait_checkpoint()
+---
+- true
+...
+box.cfg{checkpoint_wal_threshold = default_threshold}
+---
+...
diff --git a/test/xlog/checkpoint_threshold.test.lua b/test/xlog/checkpoint_threshold.test.lua
new file mode 100644
index 00000000..cd55de09
--- /dev/null
+++ b/test/xlog/checkpoint_threshold.test.lua
@@ -0,0 +1,63 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+digest = require('digest')
+
+default_threshold = box.cfg.checkpoint_wal_threshold
+threshold = 10 * 1024
+box.cfg{checkpoint_wal_threshold = threshold}
+
+s = box.schema.space.create('test')
+_ = s:create_index('pk')
+box.snapshot()
+
+test_run:cmd("setopt delimiter ';'")
+function put(size)
+    s:auto_increment{digest.urandom(size)}
+end;
+function wait_checkpoint(signature)
+    signature = signature or box.info.signature
+    return test_run:wait_cond(function()
+        local checkpoints = box.info.gc().checkpoints
+        return signature == checkpoints[#checkpoints].signature
+    end, 10)
+end;
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Check that checkpointing is triggered automatically once
+-- the size of WAL files written since the last checkpoint
+-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082).
+--
+for i = 1, 3 do put(threshold / 3) end
+wait_checkpoint()
+for i = 1, 5 do put(threshold / 5) end
+wait_checkpoint()
+
+--
+-- Check that WAL rows written while a checkpoint was created
+-- are accounted as written after the checkpoint.
+--
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true)
+
+-- This should trigger checkpointing, which will take quite
+-- a while due to the injected delay.
+for i = 1, 5 do put(threshold / 5) end
+fiber.sleep(0)
+
+-- Remember the future checkpoint signature.
+signature = box.info.signature
+
+-- Insert some records while the checkpoint is created.
+for i = 1, 4 do put(threshold / 5) end
+
+-- Disable the delay and wait for checkpointing to complete.
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false)
+wait_checkpoint(signature)
+
+-- Check that insertion of one more record triggers another
+-- checkpoint, because it sums up with records written while
+-- the previous checkpoint was created.
+put(threshold / 5)
+wait_checkpoint()
+
+box.cfg{checkpoint_wal_threshold = default_threshold}
diff --git a/test/xlog/suite.ini b/test/xlog/suite.ini
index 4f82295d..4043f370 100644
--- a/test/xlog/suite.ini
+++ b/test/xlog/suite.ini
@@ -4,7 +4,7 @@ description = tarantool write ahead log tests
 script = xlog.lua
 disabled = snap_io_rate.test.lua upgrade.test.lua
 valgrind_disabled =
-release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua
+release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua
 config = suite.cfg
 use_unix_sockets = True
 long_run = snap_io_rate.test.lua
-- 
2.11.0




More information about the Tarantool-patches mailing list