[PATCH 9/9] wal: trigger checkpoint if there are too many WALs

Vladimir Davydov vdavydov.dev at gmail.com
Wed Nov 28 19:14:47 MSK 2018


Closes #1082

@TarantoolBot document
Title: Document box.cfg.checkpoint_wal_threshold

Tarantool makes checkpoints every box.cfg.checkpoint_interval seconds
and keeps last box.cfg.checkpoint_count checkpoints. It also keeps all
intermediate WAL files. Currently, it isn't possible to set a checkpoint
trigger based on the sum size of WAL files, which makes it difficult to
estimate the minimal amount of disk space that needs to be allotted to a
Tarantool instance for storing WALs to eliminate the possibility of
ENOSPC errors. For example, under normal conditions a Tarantool instance
may write 1 GB of WAL files every box.cfg.checkpoint_interval seconds
and so one may assume that 1 GB times box.cfg.checkpoint_count should be
enough for the WAL partition, but there's no guarantee it won't write 10
GB between checkpoints when the load is extreme.

So we've agreed that we must provide users with one more configuration
option that could be used to impose the limit on the sum size of WAL
files. The new option is called box.cfg.checkpoint_wal_threshold. Once
the configured threshold is exceeded, the WAL thread notifies the
checkpoint daemon that it's time to make a new checkpoint and delete
old WAL files. Note, the new option only limits the size of WAL files
created since the last checkpoint, because backup WAL files are not
needed for recovery and can be deleted in case of emergency ENOSPC, for
more details see tarantool/tarantool#1082, tarantool/tarantool#3397,
tarantool/tarantool#3822.
---
 src/box/box.cc                          |  21 +++++-
 src/box/box.h                           |   1 +
 src/box/lua/cfg.cc                      |  12 ++++
 src/box/lua/load_cfg.lua                |   3 +
 src/box/wal.c                           | 106 ++++++++++++++++++++++++++++--
 src/box/wal.h                           |  25 ++++++-
 test/app-tap/init_script.result         |  87 +++++++++++++------------
 test/box/admin.result                   |   2 +
 test/box/cfg.result                     |   4 ++
 test/xlog/checkpoint_threshold.result   | 112 ++++++++++++++++++++++++++++++++
 test/xlog/checkpoint_threshold.test.lua |  62 ++++++++++++++++++
 test/xlog/suite.ini                     |   2 +-
 12 files changed, 385 insertions(+), 52 deletions(-)
 create mode 100644 test/xlog/checkpoint_threshold.result
 create mode 100644 test/xlog/checkpoint_threshold.test.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index 24ddd941..1af08ea3 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -938,6 +938,13 @@ box_set_checkpoint_interval(void)
 }
 
 void
+box_set_checkpoint_wal_threshold(void)
+{
+	int64_t checkpoint_threshold = cfg_geti64("checkpoint_wal_threshold");
+	wal_set_checkpoint_threshold(checkpoint_threshold);
+}
+
+void
 box_set_checkpoint_count(void)
 {
 	int checkpoint_count = cfg_geti("checkpoint_count");
@@ -2111,6 +2118,17 @@ on_wal_garbage_collection(const struct vclock *vclock)
 	gc_advance(vclock);
 }
 
+static void
+on_wal_checkpoint_threshold(void)
+{
+	if (box_checkpoint_is_in_progress)
+		return;
+
+	say_info("WAL threshold exceeded, triggering checkpoint");
+	next_checkpoint_time = ev_monotonic_now(loop());
+	fiber_wakeup(checkpoint_daemon);
+}
+
 void
 box_init(void)
 {
@@ -2225,7 +2243,8 @@ box_cfg_xc(void)
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
 		     wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
-		     &checkpoint->vclock, on_wal_garbage_collection) != 0) {
+		     &checkpoint->vclock, on_wal_garbage_collection,
+		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
 	}
 
diff --git a/src/box/box.h b/src/box/box.h
index 9bde583d..eb037d07 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -200,6 +200,7 @@ void box_set_too_long_threshold(void);
 void box_set_readahead(void);
 void box_set_checkpoint_count(void);
 void box_set_checkpoint_interval(void);
+void box_set_checkpoint_wal_threshold(void);
 void box_set_memtx_memory(void);
 void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_memory(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 4f08c78e..4884ce01 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -176,6 +176,17 @@ lbox_cfg_set_checkpoint_interval(struct lua_State *L)
 }
 
 static int
+lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
+{
+	try {
+		box_set_checkpoint_wal_threshold();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
+static int
 lbox_cfg_set_read_only(struct lua_State *L)
 {
 	try {
@@ -352,6 +363,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit},
 		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
 		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
+		{"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
 		{"cfg_set_read_only", lbox_cfg_set_read_only},
 		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
 		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index a9e6fe07..4f887b1a 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -68,6 +68,7 @@ local default_cfg = {
     read_only           = false,
     hot_standby         = false,
     checkpoint_interval = 3600,
+    checkpoint_wal_threshold = 0,
     checkpoint_count    = 2,
     worker_pool_threads = 4,
     replication_timeout = 1,
@@ -128,6 +129,7 @@ local template_cfg = {
     username            = 'string',
     coredump            = 'boolean',
     checkpoint_interval = 'number',
+    checkpoint_wal_threshold = 'number',
     checkpoint_count    = 'number',
     read_only           = 'boolean',
     hot_standby         = 'boolean',
@@ -228,6 +230,7 @@ local dynamic_cfg = {
     vinyl_timeout           = private.cfg_set_vinyl_timeout,
     checkpoint_count        = private.cfg_set_checkpoint_count,
     checkpoint_interval     = private.cfg_set_checkpoint_interval,
+    checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
     worker_pool_threads     = private.cfg_set_worker_pool_threads,
     feedback_enabled        = private.feedback_daemon.set_feedback_params,
     feedback_host           = private.feedback_daemon.set_feedback_params,
diff --git a/src/box/wal.c b/src/box/wal.c
index bbb8376c..1933c6b4 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -92,6 +92,7 @@ struct wal_writer
 	struct journal base;
 	/* ----------------- tx ------------------- */
 	wal_on_garbage_collection_f on_garbage_collection;
+	wal_on_checkpoint_threshold_f on_checkpoint_threshold;
 	/**
 	 * The rollback queue. An accumulator for all requests
 	 * that need to be rolled back. Also acts as a valve
@@ -126,6 +127,25 @@ struct wal_writer
 	 * recover from it even if it is running out of disk space.
 	 */
 	struct vclock checkpoint_vclock;
+	/**
+	 * Total size of WAL files written since the last
+	 * checkpoint.
+	 */
+	int64_t checkpoint_wal_size;
+	/**
+	 * If greater than 0, this variable sets a limit on the
+	 * total size of WAL files written since the last checkpoint.
+	 * Exceeding it will trigger auto checkpointing in tx.
+	 */
+	int64_t checkpoint_threshold;
+	/**
+	 * Set if the WAL thread has signalled the TX thread that
+	 * the checkpoint threshold has been exceeded, cleared
+	 * on checkpoint completion. Needed so as not to invoke
+	 * the tx callback over and over again while checkpointing
+	 * is in progress.
+	 */
+	bool checkpoint_threshold_signalled;
 	/** The current WAL file. */
 	struct xlog current_wal;
 	/**
@@ -154,6 +174,11 @@ struct wal_msg {
 	 */
 	struct stailq rollback;
 	/**
+	 * Set if the checkpoint threshold was exceeded while
+	 * processing this request.
+	 */
+	bool checkpoint_threshold_exceeded;
+	/**
 	 * Set if the WAL thread ran out of disk space while
 	 * processing this request and had to delete some old
 	 * WAL files.
@@ -201,6 +226,7 @@ wal_msg_create(struct wal_msg *batch)
 	cmsg_init(&batch->base, wal_request_route);
 	batch->approx_len = 0;
 	batch->gc_executed = false;
+	batch->checkpoint_threshold_exceeded = false;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
 }
@@ -282,6 +308,8 @@ tx_schedule_commit(struct cmsg *msg)
 
 	if (batch->gc_executed)
 		writer->on_garbage_collection(&batch->gc_vclock);
+	if (batch->checkpoint_threshold_exceeded)
+		writer->on_checkpoint_threshold();
 }
 
 static void
@@ -313,7 +341,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 		  const struct vclock *vclock,
 		  const struct vclock *checkpoint_vclock,
-		  wal_on_garbage_collection_f on_garbage_collection)
+		  wal_on_garbage_collection_f on_garbage_collection,
+		  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_rows = wal_max_rows;
@@ -329,11 +358,16 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	stailq_create(&writer->rollback);
 	cmsg_init(&writer->in_rollback, NULL);
 
+	writer->checkpoint_wal_size = 0;
+	writer->checkpoint_threshold = 0;
+	writer->checkpoint_threshold_signalled = false;
+
 	vclock_copy(&writer->vclock, vclock);
 	vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
 	rlist_create(&writer->watchers);
 
 	writer->on_garbage_collection = on_garbage_collection;
+	writer->on_checkpoint_threshold = on_checkpoint_threshold;
 }
 
 /** Destroy a WAL writer structure. */
@@ -439,14 +473,16 @@ int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
-	 wal_on_garbage_collection_f on_garbage_collection)
+	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	assert(wal_max_rows > 1);
 
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
 			  wal_max_size, instance_uuid, vclock,
-			  checkpoint_vclock, on_garbage_collection);
+			  checkpoint_vclock, on_garbage_collection,
+			  on_checkpoint_threshold);
 
 	/*
 	 * Scan the WAL directory to build an index of all
@@ -517,6 +553,7 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data)
 		 */
 	}
 	vclock_copy(&msg->vclock, &writer->vclock);
+	msg->wal_size = writer->checkpoint_wal_size;
 	return 0;
 }
 
@@ -526,6 +563,7 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->wal_mode == WAL_NONE) {
 		vclock_copy(&checkpoint->vclock, &writer->vclock);
+		checkpoint->wal_size = 0;
 		return 0;
 	}
 	if (!stailq_empty(&writer->rollback)) {
@@ -554,7 +592,20 @@ wal_commit_checkpoint_f(struct cbus_call_msg *data)
 {
 	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
 	struct wal_writer *writer = &wal_writer_singleton;
+	/*
+	 * Now, once checkpoint has been created, we can update
+	 * the WAL's version of the last checkpoint vclock and
+	 * reset the size of WAL files written since the last
+	 * checkpoint. Note, since new WAL records may have been
+	 * written while the checkpoint was created, we subtract
+	 * the value of checkpoint_wal_size observed at the time
+	 * when checkpointing started from the current value
+	 * rather than just setting it to 0.
+	 */
 	vclock_copy(&writer->checkpoint_vclock, &msg->vclock);
+	assert(writer->checkpoint_wal_size >= msg->wal_size);
+	writer->checkpoint_wal_size -= msg->wal_size;
+	writer->checkpoint_threshold_signalled = false;
 	return 0;
 }
 
@@ -573,6 +624,37 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
 	fiber_set_cancellable(cancellable);
 }
 
+struct wal_set_checkpoint_threshold_msg
+{
+	struct cbus_call_msg base;
+	int64_t checkpoint_threshold;
+};
+
+static int
+wal_set_checkpoint_threshold_f(struct cbus_call_msg *data)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_set_checkpoint_threshold_msg *msg;
+	msg = (struct wal_set_checkpoint_threshold_msg *)data;
+	writer->checkpoint_threshold = msg->checkpoint_threshold;
+	return 0;
+}
+
+void
+wal_set_checkpoint_threshold(int64_t checkpoint_threshold)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	if (writer->wal_mode == WAL_NONE)
+		return;
+	struct wal_set_checkpoint_threshold_msg msg;
+	msg.checkpoint_threshold = checkpoint_threshold;
+	bool cancellable = fiber_set_cancellable(false);
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+		  &msg.base, wal_set_checkpoint_threshold_f, NULL,
+		  TIMEOUT_INFINITY);
+	fiber_set_cancellable(cancellable);
+}
+
 struct wal_gc_msg
 {
 	struct cbus_call_msg base;
@@ -859,23 +941,35 @@ wal_write_to_disk(struct cmsg *msg)
 	/*
 	 * Iterate over requests (transactions)
 	 */
+	int rc;
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
 		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(&writer->vclock);
-		int rc = xlog_write_entry(l, entry);
+		rc = xlog_write_entry(l, entry);
 		if (rc < 0)
 			goto done;
-		if (rc > 0)
+		if (rc > 0) {
+			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
+		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
-	if (xlog_flush(l) < 0)
+	rc = xlog_flush(l);
+	if (rc < 0)
 		goto done;
 
+	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 
+	if (writer->checkpoint_threshold > 0 &&
+	    !writer->checkpoint_threshold_signalled &&
+	    writer->checkpoint_wal_size > writer->checkpoint_threshold) {
+		writer->checkpoint_threshold_signalled = true;
+		wal_msg->checkpoint_threshold_exceeded = true;
+	}
+
 done:
 	error = diag_last_error(diag_get());
 	if (error) {
diff --git a/src/box/wal.h b/src/box/wal.h
index 3f0fae07..1ba8bf8b 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -61,6 +61,13 @@ extern "C" {
  */
 typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
 
+/**
+ * Callback invoked on behalf of the tx thread upon request
+ * completion when the total size of WAL files written since
+ * the last checkpoint exceeds the configured threshold.
+ */
+typedef void (*wal_on_checkpoint_threshold_f)(void);
+
 void
 wal_thread_start();
 
@@ -68,7 +75,8 @@ int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
-	 wal_on_garbage_collection_f on_garbage_collection);
+	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
 void
 wal_thread_stop();
@@ -168,6 +176,12 @@ struct wal_checkpoint {
 	 * identify the new checkpoint.
 	 */
 	struct vclock vclock;
+	/**
+	 * Size of WAL files written since the last checkpoint.
+	 * Used to reset the corresponding WAL counter upon
+	 * successful checkpoint creation.
+	 */
+	int64_t wal_size;
 };
 
 /**
@@ -190,6 +204,15 @@ void
 wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 
 /**
+ * Limit the size of WAL files written since the last checkpoint.
+ * Exceeding the limit will trigger automatic checkpointing in tx.
+ * If @checkpoint_threshold is less than or equal to 0, automatic
+ * checkpointing will be disabled.
+ */
+void
+wal_set_checkpoint_threshold(int64_t checkpoint_threshold);
+
+/**
  * Remove WAL files that are not needed by consumers reading
  * rows at @vclock or newer.
  */
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index b03e5159..fd58ae94 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -6,49 +6,50 @@ box.cfg
 1	background:false
 2	checkpoint_count:2
 3	checkpoint_interval:3600
-4	coredump:false
-5	feedback_enabled:true
-6	feedback_host:https://feedback.tarantool.io
-7	feedback_interval:3600
-8	force_recovery:false
-9	hot_standby:false
-10	listen:port
-11	log:tarantool.log
-12	log_format:plain
-13	log_level:5
-14	memtx_dir:.
-15	memtx_max_tuple_size:1048576
-16	memtx_memory:107374182
-17	memtx_min_tuple_size:16
-18	net_msg_max:768
-19	pid_file:box.pid
-20	read_only:false
-21	readahead:16320
-22	replication_connect_timeout:30
-23	replication_skip_conflict:false
-24	replication_sync_lag:10
-25	replication_sync_timeout:300
-26	replication_timeout:1
-27	rows_per_wal:500000
-28	slab_alloc_factor:1.05
-29	too_long_threshold:0.5
-30	vinyl_bloom_fpr:0.05
-31	vinyl_cache:134217728
-32	vinyl_dir:.
-33	vinyl_max_tuple_size:1048576
-34	vinyl_memory:134217728
-35	vinyl_page_size:8192
-36	vinyl_range_size:1073741824
-37	vinyl_read_threads:1
-38	vinyl_run_count_per_level:2
-39	vinyl_run_size_ratio:3.5
-40	vinyl_timeout:60
-41	vinyl_write_threads:4
-42	wal_dir:.
-43	wal_dir_rescan_delay:2
-44	wal_max_size:268435456
-45	wal_mode:write
-46	worker_pool_threads:4
+4	checkpoint_wal_threshold:0
+5	coredump:false
+6	feedback_enabled:true
+7	feedback_host:https://feedback.tarantool.io
+8	feedback_interval:3600
+9	force_recovery:false
+10	hot_standby:false
+11	listen:port
+12	log:tarantool.log
+13	log_format:plain
+14	log_level:5
+15	memtx_dir:.
+16	memtx_max_tuple_size:1048576
+17	memtx_memory:107374182
+18	memtx_min_tuple_size:16
+19	net_msg_max:768
+20	pid_file:box.pid
+21	read_only:false
+22	readahead:16320
+23	replication_connect_timeout:30
+24	replication_skip_conflict:false
+25	replication_sync_lag:10
+26	replication_sync_timeout:300
+27	replication_timeout:1
+28	rows_per_wal:500000
+29	slab_alloc_factor:1.05
+30	too_long_threshold:0.5
+31	vinyl_bloom_fpr:0.05
+32	vinyl_cache:134217728
+33	vinyl_dir:.
+34	vinyl_max_tuple_size:1048576
+35	vinyl_memory:134217728
+36	vinyl_page_size:8192
+37	vinyl_range_size:1073741824
+38	vinyl_read_threads:1
+39	vinyl_run_count_per_level:2
+40	vinyl_run_size_ratio:3.5
+41	vinyl_timeout:60
+42	vinyl_write_threads:4
+43	wal_dir:.
+44	wal_dir_rescan_delay:2
+45	wal_max_size:268435456
+46	wal_mode:write
+47	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 6da53f30..80e220cc 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -32,6 +32,8 @@ cfg_filter(box.cfg)
     - 2
   - - checkpoint_interval
     - 3600
+  - - checkpoint_wal_threshold
+    - 0
   - - coredump
     - false
   - - feedback_enabled
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 01e6bc6b..248a7e8c 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -20,6 +20,8 @@ cfg_filter(box.cfg)
     - 2
   - - checkpoint_interval
     - 3600
+  - - checkpoint_wal_threshold
+    - 0
   - - coredump
     - false
   - - feedback_enabled
@@ -119,6 +121,8 @@ cfg_filter(box.cfg)
     - 2
   - - checkpoint_interval
     - 3600
+  - - checkpoint_wal_threshold
+    - 0
   - - coredump
     - false
   - - feedback_enabled
diff --git a/test/xlog/checkpoint_threshold.result b/test/xlog/checkpoint_threshold.result
new file mode 100644
index 00000000..bad5e2be
--- /dev/null
+++ b/test/xlog/checkpoint_threshold.result
@@ -0,0 +1,112 @@
+test_run = require('test_run').new()
+---
+...
+fiber = require('fiber')
+---
+...
+digest = require('digest')
+---
+...
+threshold = 10 * 1024
+---
+...
+box.cfg{checkpoint_wal_threshold = threshold}
+---
+...
+s = box.schema.space.create('test')
+---
+...
+_ = s:create_index('pk')
+---
+...
+box.snapshot()
+---
+- ok
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function put(size)
+    s:auto_increment{digest.urandom(size)}
+end;
+---
+...
+function wait_checkpoint(signature)
+    signature = signature or box.info.signature
+    return test_run:wait_cond(function()
+        local checkpoints = box.info.gc().checkpoints
+        return signature == checkpoints[#checkpoints].signature
+    end, 10)
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+--
+-- Check that checkpointing is triggered automatically once
+-- the size of WAL files written since the last checkpoint
+-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082).
+--
+for i = 1, 3 do put(threshold / 3) end
+---
+...
+wait_checkpoint()
+---
+- true
+...
+for i = 1, 5 do put(threshold / 5) end
+---
+...
+wait_checkpoint()
+---
+- true
+...
+--
+-- Check that WAL rows written while a checkpoint was created
+-- are accounted as written after the checkpoint.
+--
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true)
+---
+- ok
+...
+-- This should trigger checkpointing, which will take quite
+-- a while due to the injected delay.
+for i = 1, 5 do put(threshold / 5) end
+---
+...
+fiber.sleep(0)
+---
+...
+-- Remember the future checkpoint signature.
+signature = box.info.signature
+---
+...
+-- Insert some records while the checkpoint is created.
+for i = 1, 4 do put(threshold / 5) end
+---
+...
+-- Disable the delay and wait for checkpointing to complete.
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false)
+---
+- ok
+...
+wait_checkpoint(signature)
+---
+- true
+...
+-- Check that insertion of one more record triggers another
+-- checkpoint, because it sums up with records written while
+-- the previous checkpoint was created.
+put(threshold / 5)
+---
+...
+wait_checkpoint()
+---
+- true
+...
+box.cfg{checkpoint_wal_threshold = 0}
+---
+...
diff --git a/test/xlog/checkpoint_threshold.test.lua b/test/xlog/checkpoint_threshold.test.lua
new file mode 100644
index 00000000..7057d835
--- /dev/null
+++ b/test/xlog/checkpoint_threshold.test.lua
@@ -0,0 +1,62 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+digest = require('digest')
+
+threshold = 10 * 1024
+box.cfg{checkpoint_wal_threshold = threshold}
+
+s = box.schema.space.create('test')
+_ = s:create_index('pk')
+box.snapshot()
+
+test_run:cmd("setopt delimiter ';'")
+function put(size)
+    s:auto_increment{digest.urandom(size)}
+end;
+function wait_checkpoint(signature)
+    signature = signature or box.info.signature
+    return test_run:wait_cond(function()
+        local checkpoints = box.info.gc().checkpoints
+        return signature == checkpoints[#checkpoints].signature
+    end, 10)
+end;
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Check that checkpointing is triggered automatically once
+-- the size of WAL files written since the last checkpoint
+-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082).
+--
+for i = 1, 3 do put(threshold / 3) end
+wait_checkpoint()
+for i = 1, 5 do put(threshold / 5) end
+wait_checkpoint()
+
+--
+-- Check that WAL rows written while a checkpoint was created
+-- are accounted as written after the checkpoint.
+--
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true)
+
+-- This should trigger checkpointing, which will take quite
+-- a while due to the injected delay.
+for i = 1, 5 do put(threshold / 5) end
+fiber.sleep(0)
+
+-- Remember the future checkpoint signature.
+signature = box.info.signature
+
+-- Insert some records while the checkpoint is created.
+for i = 1, 4 do put(threshold / 5) end
+
+-- Disable the delay and wait for checkpointing to complete.
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false)
+wait_checkpoint(signature)
+
+-- Check that insertion of one more record triggers another
+-- checkpoint, because it sums up with records written while
+-- the previous checkpoint was created.
+put(threshold / 5)
+wait_checkpoint()
+
+box.cfg{checkpoint_wal_threshold = 0}
diff --git a/test/xlog/suite.ini b/test/xlog/suite.ini
index 4f82295d..4043f370 100644
--- a/test/xlog/suite.ini
+++ b/test/xlog/suite.ini
@@ -4,7 +4,7 @@ description = tarantool write ahead log tests
 script = xlog.lua
 disabled = snap_io_rate.test.lua upgrade.test.lua
 valgrind_disabled =
-release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua
+release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua
 config = suite.cfg
 use_unix_sockets = True
 long_run = snap_io_rate.test.lua
-- 
2.11.0




More information about the Tarantool-patches mailing list