Tarantool development patches archive
 help / color / mirror / Atom feed
* [PATCH 0/9] Allow to limit size of WAL files
@ 2018-11-28 16:14 Vladimir Davydov
  2018-11-28 16:14 ` [PATCH 1/9] wal: separate checkpoint and flush paths Vladimir Davydov
                   ` (8 more replies)
  0 siblings, 9 replies; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

Tarantool makes checkpoints every box.cfg.checkpoint_interval seconds
and keeps last box.cfg.checkpoint_count checkpoints. It also keeps all
intermediate WAL files. Currently, it isn't possible to set a checkpoint
trigger based on the sum size of WAL files, which makes it difficult to
estimate the minimal amount of disk space that needs to be allotted to a
Tarantool instance for storing WALs to eliminate the possibility of
ENOSPC errors. For example, under normal conditions a Tarantool instance
may write 1 GB of WAL files every box.cfg.checkpoint_interval seconds
and so one may assume that 1 GB times box.cfg.checkpoint_count should be
enough for the WAL partition, but there's no guarantee it won't write 10
GB between checkpoints when the load is extreme.

So we've agreed that we must provide users with one more configuration
option that could be used to impose the limit on the sum size of WAL
files. The new option is called box.cfg.checkpoint_wal_threshold. Once
the configured threshold is exceeded, the WAL thread notifies the
checkpoint daemon that it's time to make a new checkpoint and delete
old WAL files. Note, the new option only limits the size of WAL files
created since the last checkpoint, because backup WAL files are not
needed for recovery and can be deleted in case of emergency ENOSPC.

https://github.com/tarantool/tarantool/issues/1082
https://github.com/tarantool/tarantool/commits/dv/gh-1082-wal-checkpoint-threshold

Vladimir Davydov (9):
  wal: separate checkpoint and flush paths
  wal: remove files needed for recovery from backup checkpoints on
    ENOSPC
  recovery: restore garbage collector vclock after restart
  gc: run garbage collection in background
  gc: do not use WAL watcher API for deactivating stale consumers
  wal: simplify watcher API
  box: rewrite checkpoint daemon in C
  wal: pass struct instead of vclock to checkpoint methods
  wal: trigger checkpoint if there are too many WALs

 src/box/CMakeLists.txt                  |   1 -
 src/box/box.cc                          | 164 ++++++++++++++++--
 src/box/box.h                           |   2 +
 src/box/gc.c                            | 115 +++++++-----
 src/box/gc.h                            |  58 ++++---
 src/box/lua/cfg.cc                      |  24 +++
 src/box/lua/checkpoint_daemon.lua       | 136 ---------------
 src/box/lua/init.c                      |   2 -
 src/box/lua/load_cfg.lua                |   5 +-
 src/box/recovery.cc                     |  14 +-
 src/box/recovery.h                      |   5 +-
 src/box/relay.cc                        |  12 +-
 src/box/vinyl.c                         |   5 +-
 src/box/wal.c                           | 298 +++++++++++++++++++++-----------
 src/box/wal.h                           | 110 +++++++-----
 test/app-tap/init_script.result         |  87 +++++-----
 test/box/admin.result                   |   2 +
 test/box/cfg.result                     |   4 +
 test/replication/gc_no_space.result     |  62 +++++--
 test/replication/gc_no_space.test.lua   |  30 +++-
 test/xlog/checkpoint_daemon.result      | 145 ----------------
 test/xlog/checkpoint_daemon.test.lua    |  56 ------
 test/xlog/checkpoint_threshold.result   | 112 ++++++++++++
 test/xlog/checkpoint_threshold.test.lua |  62 +++++++
 test/xlog/panic_on_wal_error.result     |   2 +-
 test/xlog/panic_on_wal_error.test.lua   |   2 +-
 test/xlog/suite.ini                     |   2 +-
 27 files changed, 876 insertions(+), 641 deletions(-)
 delete mode 100644 src/box/lua/checkpoint_daemon.lua
 create mode 100644 test/xlog/checkpoint_threshold.result
 create mode 100644 test/xlog/checkpoint_threshold.test.lua

-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 1/9] wal: separate checkpoint and flush paths
  2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
@ 2018-11-28 16:14 ` Vladimir Davydov
  2018-11-29 16:24   ` [tarantool-patches] " Konstantin Osipov
  2018-11-28 16:14 ` [PATCH 2/9] wal: remove files needed for recovery from backup checkpoints on ENOSPC Vladimir Davydov
                   ` (7 subsequent siblings)
  8 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

Currently, wal_checkpoint() is used for two purposes. First, to make a
checkpoint (rotate = true). Second, to flush all pending WAL requests
(rotate = false). Since checkpointing has to fail if cascading rollback
is in progress so does flushing. This is confusing. Let's separate the
two paths.

While we are at it, let's also rewrite WAL checkpointing using cbus_call
instead of cpipe_push as it's a more convenient way of exchanging simple
two-hop messages between two threads.
---
 src/box/box.cc  |  5 ++--
 src/box/vinyl.c |  5 +---
 src/box/wal.c   | 91 +++++++++++++++++++++++++++------------------------------
 src/box/wal.h   | 15 +++++++---
 4 files changed, 57 insertions(+), 59 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 8d6e966e..5ea2f014 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2190,10 +2190,9 @@ box_checkpoint()
 		goto end;
 
 	struct vclock vclock;
-	if ((rc = wal_checkpoint(&vclock, true))) {
-		tnt_error(ClientError, ER_CHECKPOINT_ROLLBACK);
+	if ((rc = wal_checkpoint(&vclock)))
 		goto end;
-	}
+
 	rc = engine_commit_checkpoint(&vclock);
 end:
 	if (rc)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 1794489d..05df1329 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1014,10 +1014,7 @@ vy_abort_writers_for_ddl(struct vy_env *env, struct vy_lsm *lsm)
 	 * Wait for prepared transactions to complete
 	 * (we can't abort them as they reached WAL).
 	 */
-	struct vclock unused;
-	if (wal_checkpoint(&unused, false) != 0)
-		return -1;
-
+	wal_flush();
 	return 0;
 }
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 11aae5fc..bc396ca5 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -461,29 +461,39 @@ wal_thread_stop()
 		wal_writer_destroy(&wal_writer_singleton);
 }
 
+void
+wal_flush(void)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	if (writer->wal_mode == WAL_NONE)
+		return;
+	cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL);
+}
+
 struct wal_checkpoint
 {
-	struct cmsg base;
-	struct vclock *vclock;
-	struct fiber *fiber;
-	bool rotate;
-	int res;
+	struct cbus_call_msg base;
+	struct vclock vclock;
 };
 
-void
-wal_checkpoint_f(struct cmsg *data)
+static int
+wal_checkpoint_f(struct cbus_call_msg *data)
 {
 	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->in_rollback.route != NULL) {
-		/* We're rolling back a failed write. */
-		msg->res = -1;
-		return;
+		/*
+		 * We're rolling back a failed write and so
+		 * can't make a checkpoint - see the comment
+		 * in wal_checkpoint() for the explanation.
+		 */
+		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
+		return -1;
 	}
 	/*
 	 * Avoid closing the current WAL if it has no rows (empty).
 	 */
-	if (msg->rotate && xlog_is_open(&writer->current_wal) &&
+	if (xlog_is_open(&writer->current_wal) &&
 	    vclock_sum(&writer->current_wal.meta.vclock) !=
 	    vclock_sum(&writer->vclock)) {
 
@@ -492,53 +502,38 @@ wal_checkpoint_f(struct cmsg *data)
 		 * The next WAL will be created on the first write.
 		 */
 	}
-	vclock_copy(msg->vclock, &writer->vclock);
-}
-
-void
-wal_checkpoint_done_f(struct cmsg *data)
-{
-	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
-	fiber_wakeup(msg->fiber);
+	vclock_copy(&msg->vclock, &writer->vclock);
+	return 0;
 }
 
 int
-wal_checkpoint(struct vclock *vclock, bool rotate)
+wal_checkpoint(struct vclock *vclock)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
-	if (! stailq_empty(&writer->rollback)) {
-		/*
-		 * The writer rollback queue is not empty,
-		 * roll back this transaction immediately.
-		 * This is to ensure we do not accidentally
-		 * commit a transaction which has seen changes
-		 * that will be rolled back.
-		 */
-		say_error("Aborting transaction %llu during "
-			  "cascading rollback",
-			  vclock_sum(&writer->vclock));
-		return -1;
-	}
 	if (writer->wal_mode == WAL_NONE) {
 		vclock_copy(vclock, &writer->vclock);
 		return 0;
 	}
-	static struct cmsg_hop wal_checkpoint_route[] = {
-		{wal_checkpoint_f, &wal_thread.tx_prio_pipe},
-		{wal_checkpoint_done_f, NULL},
-	};
-	vclock_create(vclock);
+	if (!stailq_empty(&writer->rollback)) {
+		/*
+		 * If cascading rollback is in progress, in-memory
+		 * indexes can contain changes scheduled for rollback.
+		 * If we made a checkpoint, we could write them to
+		 * the snapshot. So we abort checkpointing in this
+		 * case.
+		 */
+		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
+		return -1;
+	}
 	struct wal_checkpoint msg;
-	cmsg_init(&msg.base, wal_checkpoint_route);
-	msg.vclock = vclock;
-	msg.fiber = fiber();
-	msg.rotate = rotate;
-	msg.res = 0;
-	cpipe_push(&wal_thread.wal_pipe, &msg.base);
-	fiber_set_cancellable(false);
-	fiber_yield();
-	fiber_set_cancellable(true);
-	return msg.res;
+	bool cancellable = fiber_set_cancellable(false);
+	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+			   &msg.base, wal_checkpoint_f, NULL, TIMEOUT_INFINITY);
+	fiber_set_cancellable(cancellable);
+	if (rc != 0)
+		return -1;
+	vclock_copy(vclock, &msg.vclock);
+	return 0;
 }
 
 struct wal_gc_msg
diff --git a/src/box/wal.h b/src/box/wal.h
index e4094b1e..7ca27f1a 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -166,13 +166,20 @@ wal_mode();
 
 /**
  * Wait till all pending changes to the WAL are flushed.
- * Rotates the WAL.
- *
- * @param[out] vclock WAL vclock
+ */
+void
+wal_flush(void);
+
+/**
+ * Prepare WAL for checkpointing.
  *
+ * This function flushes all pending changes and rotates the
+ * current WAL. The vclock of the last record written to the
+ * rotated WAL is returned in @vclock. This is the vclock that
+ * is supposed to be used to identify the new checkpoint.
  */
 int
-wal_checkpoint(struct vclock *vclock, bool rotate);
+wal_checkpoint(struct vclock *vclock);
 
 /**
  * Remove WAL files that are not needed by consumers reading
-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 2/9] wal: remove files needed for recovery from backup checkpoints on ENOSPC
  2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
  2018-11-28 16:14 ` [PATCH 1/9] wal: separate checkpoint and flush paths Vladimir Davydov
@ 2018-11-28 16:14 ` Vladimir Davydov
  2018-11-29 16:31   ` [tarantool-patches] " Konstantin Osipov
  2018-11-28 16:14 ` [PATCH 3/9] recovery: restore garbage collector vclock after restart Vladimir Davydov
                   ` (6 subsequent siblings)
  8 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

Tarantool always keeps box.cfg.checkpoint_count latest checkpoints. It
also never deletes WAL files needed for recovery from any of them for
the sake of redundancy, even if it gets ENOSPC while trying to write to
WAL. This patch changes that behavior: now the WAL thread is allowed to
delete backup WAL files in case of emergency ENOSPC - after all it's
better than stopping operation.

Closes #3822
---
 src/box/box.cc                        | 17 +++++------
 src/box/gc.c                          | 17 ++++++-----
 src/box/gc.h                          | 14 ---------
 src/box/wal.c                         | 54 ++++++++++++++++++++++++-----------
 src/box/wal.h                         | 22 +++++++-------
 test/replication/gc_no_space.result   | 47 ++++++++++++++++++++++++++----
 test/replication/gc_no_space.test.lua | 23 +++++++++++----
 7 files changed, 127 insertions(+), 67 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 5ea2f014..72788f82 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2104,6 +2104,8 @@ box_cfg_xc(void)
 		/* Bootstrap a new master */
 		bootstrap(&instance_uuid, &replicaset_uuid,
 			  &is_bootstrap_leader);
+		checkpoint = gc_last_checkpoint();
+		assert(checkpoint != NULL);
 	}
 	fiber_gc();
 
@@ -2117,16 +2119,13 @@ box_cfg_xc(void)
 		}
 	}
 
-	struct gc_checkpoint *first_checkpoint = gc_first_checkpoint();
-	assert(first_checkpoint != NULL);
-
 	/* Start WAL writer */
 	int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
 		     wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
-		     &first_checkpoint->vclock) != 0) {
+		     &checkpoint->vclock) != 0) {
 		diag_raise();
 	}
 	gc_set_wal_watcher();
@@ -2190,15 +2189,17 @@ box_checkpoint()
 		goto end;
 
 	struct vclock vclock;
-	if ((rc = wal_checkpoint(&vclock)))
+	if ((rc = wal_begin_checkpoint(&vclock)))
+		goto end;
+
+	if ((rc = engine_commit_checkpoint(&vclock)))
 		goto end;
 
-	rc = engine_commit_checkpoint(&vclock);
+	wal_commit_checkpoint(&vclock);
+	gc_add_checkpoint(&vclock);
 end:
 	if (rc)
 		engine_abort_checkpoint();
-	else
-		gc_add_checkpoint(&vclock);
 
 	latch_unlock(&schema_lock);
 	box_checkpoint_is_in_progress = false;
diff --git a/src/box/gc.c b/src/box/gc.c
index 55c36d15..05773b91 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -218,13 +218,8 @@ gc_run(void)
 	int rc = 0;
 	if (run_engine_gc)
 		rc = engine_collect_garbage(&checkpoint->vclock);
-	/*
-	 * Run wal_collect_garbage() even if we don't need to
-	 * delete any WAL files, because we have to apprise
-	 * the WAL thread of the oldest checkpoint signature.
-	 */
-	if (rc == 0)
-		wal_collect_garbage(vclock, &checkpoint->vclock);
+	if (run_wal_gc && rc == 0)
+		wal_collect_garbage(vclock);
 	latch_unlock(&gc.latch);
 }
 
@@ -236,6 +231,14 @@ gc_process_wal_event(struct wal_watcher_msg *msg)
 {
 	assert((msg->events & WAL_EVENT_GC) != 0);
 
+	/*
+	 * In case of emergency ENOSPC, the WAL thread may delete
+	 * WAL files needed to restore from backup checkpoints,
+	 * which would be kept by the garbage collector otherwise.
+	 * Bring the garbage collector vclock up to date.
+	 */
+	vclock_copy(&gc.vclock, &msg->gc_vclock);
+
 	struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
 	while (consumer != NULL &&
 	       vclock_sum(&consumer->vclock) < vclock_sum(&msg->gc_vclock)) {
diff --git a/src/box/gc.h b/src/box/gc.h
index e1241baa..eab19ba3 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -156,20 +156,6 @@ extern struct gc_state gc;
 	rlist_foreach_entry(ref, &(checkpoint)->refs, in_refs)
 
 /**
- * Return the first (oldest) checkpoint known to the garbage
- * collector. If there's no checkpoint, return NULL.
- */
-static inline struct gc_checkpoint *
-gc_first_checkpoint(void)
-{
-	if (rlist_empty(&gc.checkpoints))
-		return NULL;
-
-	return rlist_first_entry(&gc.checkpoints, struct gc_checkpoint,
-				 in_checkpoints);
-}
-
-/**
  * Return the last (newest) checkpoint known to the garbage
  * collector. If there's no checkpoint, return NULL.
  */
diff --git a/src/box/wal.c b/src/box/wal.c
index bc396ca5..3a73c5c5 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -120,7 +120,7 @@ struct wal_writer
 	 */
 	struct vclock vclock;
 	/**
-	 * VClock of the oldest checkpoint available on the instance.
+	 * VClock of the most recent successfully created checkpoint.
 	 * The WAL writer must not delete WAL files that are needed to
 	 * recover from it even if it is running out of disk space.
 	 */
@@ -419,14 +419,14 @@ wal_open(struct wal_writer *writer)
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *first_checkpoint_vclock)
+	 const struct vclock *vclock, const struct vclock *checkpoint_vclock)
 {
 	assert(wal_max_rows > 1);
 
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
 			  wal_max_size, instance_uuid, vclock,
-			  first_checkpoint_vclock);
+			  checkpoint_vclock);
 
 	/*
 	 * Scan the WAL directory to build an index of all
@@ -477,7 +477,7 @@ struct wal_checkpoint
 };
 
 static int
-wal_checkpoint_f(struct cbus_call_msg *data)
+wal_begin_checkpoint_f(struct cbus_call_msg *data)
 {
 	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
 	struct wal_writer *writer = &wal_writer_singleton;
@@ -485,7 +485,7 @@ wal_checkpoint_f(struct cbus_call_msg *data)
 		/*
 		 * We're rolling back a failed write and so
 		 * can't make a checkpoint - see the comment
-		 * in wal_checkpoint() for the explanation.
+		 * in wal_begin_checkpoint() for the explanation.
 		 */
 		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
 		return -1;
@@ -507,7 +507,7 @@ wal_checkpoint_f(struct cbus_call_msg *data)
 }
 
 int
-wal_checkpoint(struct vclock *vclock)
+wal_begin_checkpoint(struct vclock *vclock)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->wal_mode == WAL_NONE) {
@@ -528,7 +528,8 @@ wal_checkpoint(struct vclock *vclock)
 	struct wal_checkpoint msg;
 	bool cancellable = fiber_set_cancellable(false);
 	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
-			   &msg.base, wal_checkpoint_f, NULL, TIMEOUT_INFINITY);
+			   &msg.base, wal_begin_checkpoint_f, NULL,
+			   TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 	if (rc != 0)
 		return -1;
@@ -536,19 +537,43 @@ wal_checkpoint(struct vclock *vclock)
 	return 0;
 }
 
+static int
+wal_commit_checkpoint_f(struct cbus_call_msg *data)
+{
+	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
+	struct wal_writer *writer = &wal_writer_singleton;
+	vclock_copy(&writer->checkpoint_vclock, &msg->vclock);
+	return 0;
+}
+
+void
+wal_commit_checkpoint(const struct vclock *vclock)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	if (writer->wal_mode == WAL_NONE) {
+		vclock_copy(&writer->checkpoint_vclock, vclock);
+		return;
+	}
+	struct wal_checkpoint msg;
+	vclock_copy(&msg.vclock, vclock);
+	bool cancellable = fiber_set_cancellable(false);
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+		  &msg.base, wal_commit_checkpoint_f, NULL,
+		  TIMEOUT_INFINITY);
+	fiber_set_cancellable(cancellable);
+}
+
 struct wal_gc_msg
 {
 	struct cbus_call_msg base;
-	const struct vclock *wal_vclock;
-	const struct vclock *checkpoint_vclock;
+	const struct vclock *vclock;
 };
 
 static int
 wal_collect_garbage_f(struct cbus_call_msg *data)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
-	struct wal_gc_msg *msg = (struct wal_gc_msg *)data;
-	const struct vclock *vclock = msg->wal_vclock;
+	const struct vclock *vclock = ((struct wal_gc_msg *)data)->vclock;
 
 	if (!xlog_is_open(&writer->current_wal) &&
 	    vclock_sum(vclock) >= vclock_sum(&writer->vclock)) {
@@ -568,20 +593,17 @@ wal_collect_garbage_f(struct cbus_call_msg *data)
 	if (vclock != NULL)
 		xdir_collect_garbage(&writer->wal_dir, vclock_sum(vclock), 0);
 
-	vclock_copy(&writer->checkpoint_vclock, msg->checkpoint_vclock);
 	return 0;
 }
 
 void
-wal_collect_garbage(const struct vclock *wal_vclock,
-		    const struct vclock *checkpoint_vclock)
+wal_collect_garbage(const struct vclock *vclock)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->wal_mode == WAL_NONE)
 		return;
 	struct wal_gc_msg msg;
-	msg.wal_vclock = wal_vclock;
-	msg.checkpoint_vclock = checkpoint_vclock;
+	msg.vclock = vclock;
 	bool cancellable = fiber_set_cancellable(false);
 	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg.base,
 		  wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
diff --git a/src/box/wal.h b/src/box/wal.h
index 7ca27f1a..5f3a66ce 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -58,7 +58,7 @@ wal_thread_start();
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *first_checkpoint_vclock);
+	 const struct vclock *vclock, const struct vclock *checkpoint_vclock);
 
 void
 wal_thread_stop();
@@ -179,20 +179,22 @@ wal_flush(void);
  * is supposed to be used to identify the new checkpoint.
  */
 int
-wal_checkpoint(struct vclock *vclock);
+wal_begin_checkpoint(struct vclock *vclock);
+
+/**
+ * This function is called upon successful checkpoint creation.
+ * It updates the WAL thread's version of the last checkpoint
+ * vclock.
+ */
+void
+wal_commit_checkpoint(const struct vclock *vclock);
 
 /**
  * Remove WAL files that are not needed by consumers reading
- * rows at @wal_vclock or newer.
- *
- * Update the oldest checkpoint signature with @checkpoint_vclock.
- * WAL thread will delete WAL files that are not needed to
- * recover from the oldest checkpoint if it runs out of disk
- * space.
+ * rows at @vclock or newer.
  */
 void
-wal_collect_garbage(const struct vclock *wal_vclock,
-		    const struct vclock *checkpoint_vclock);
+wal_collect_garbage(const struct vclock *vclock);
 
 void
 wal_init_vy_log();
diff --git a/test/replication/gc_no_space.result b/test/replication/gc_no_space.result
index ceea8ab3..5c64bea4 100644
--- a/test/replication/gc_no_space.result
+++ b/test/replication/gc_no_space.result
@@ -160,10 +160,21 @@ check_snap_count(2)
 ---
 - true
 ...
-#box.info.gc().consumers -- 3
+gc = box.info.gc()
+---
+...
+#gc.consumers -- 3
 ---
 - 3
 ...
+#gc.checkpoints -- 2
+---
+- 2
+...
+gc.signature == gc.consumers[1].signature
+---
+- true
+...
 --
 -- Inject a ENOSPC error and check that the WAL thread deletes
 -- old WAL files to prevent the user from seeing the error.
@@ -188,15 +199,28 @@ check_snap_count(2)
 ---
 - true
 ...
-#box.info.gc().consumers -- 1
+gc = box.info.gc()
+---
+...
+#gc.consumers -- 1
 ---
 - 1
 ...
+#gc.checkpoints -- 2
+---
+- 2
+...
+gc.signature == gc.consumers[1].signature
+---
+- true
+...
 --
 -- Check that the WAL thread never deletes WAL files that are
--- needed for recovery from a checkpoint.
+-- needed for recovery from the last checkpoint, but may delete
+-- older WAL files that would be kept otherwise for recovery
+-- from backup checkpoints.
 --
-errinj.set('ERRINJ_WAL_FALLOCATE', 2)
+errinj.set('ERRINJ_WAL_FALLOCATE', 3)
 ---
 - ok
 ...
@@ -208,7 +232,7 @@ errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
 ---
 - 0
 ...
-check_wal_count(2)
+check_wal_count(1)
 ---
 - true
 ...
@@ -216,10 +240,21 @@ check_snap_count(2)
 ---
 - true
 ...
-#box.info.gc().consumers -- 0
+gc = box.info.gc()
+---
+...
+#gc.consumers -- 0
 ---
 - 0
 ...
+#gc.checkpoints -- 2
+---
+- 2
+...
+gc.signature == gc.checkpoints[2].signature
+---
+- true
+...
 s:drop()
 ---
 ...
diff --git a/test/replication/gc_no_space.test.lua b/test/replication/gc_no_space.test.lua
index be2e3229..7f5ab803 100644
--- a/test/replication/gc_no_space.test.lua
+++ b/test/replication/gc_no_space.test.lua
@@ -70,7 +70,10 @@ s:auto_increment{}
 
 check_wal_count(5)
 check_snap_count(2)
-#box.info.gc().consumers -- 3
+gc = box.info.gc()
+#gc.consumers -- 3
+#gc.checkpoints -- 2
+gc.signature == gc.consumers[1].signature
 
 --
 -- Inject a ENOSPC error and check that the WAL thread deletes
@@ -82,19 +85,27 @@ errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
 
 check_wal_count(3)
 check_snap_count(2)
-#box.info.gc().consumers -- 1
+gc = box.info.gc()
+#gc.consumers -- 1
+#gc.checkpoints -- 2
+gc.signature == gc.consumers[1].signature
 
 --
 -- Check that the WAL thread never deletes WAL files that are
--- needed for recovery from a checkpoint.
+-- needed for recovery from the last checkpoint, but may delete
+-- older WAL files that would be kept otherwise for recovery
+-- from backup checkpoints.
 --
-errinj.set('ERRINJ_WAL_FALLOCATE', 2)
+errinj.set('ERRINJ_WAL_FALLOCATE', 3)
 s:auto_increment{} -- failure
 errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
 
-check_wal_count(2)
+check_wal_count(1)
 check_snap_count(2)
-#box.info.gc().consumers -- 0
+gc = box.info.gc()
+#gc.consumers -- 0
+#gc.checkpoints -- 2
+gc.signature == gc.checkpoints[2].signature
 
 s:drop()
 box.schema.user.revoke('guest', 'replication')
-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 3/9] recovery: restore garbage collector vclock after restart
  2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
  2018-11-28 16:14 ` [PATCH 1/9] wal: separate checkpoint and flush paths Vladimir Davydov
  2018-11-28 16:14 ` [PATCH 2/9] wal: remove files needed for recovery from backup checkpoints on ENOSPC Vladimir Davydov
@ 2018-11-28 16:14 ` Vladimir Davydov
  2018-11-29 16:37   ` [tarantool-patches] " Konstantin Osipov
  2018-11-28 16:14 ` [PATCH 4/9] gc: run garbage collection in background Vladimir Davydov
                   ` (5 subsequent siblings)
  8 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

After restart the garbage collector vclock is reset to the vclock of the
oldest preserved checkpoint, which is incorrect - it may be less in case
there is a replica that lagged behind, and it may be greater as well in
case the WAL thread hit ENOSPC and had to remove some WAL files to
continue. Fix it.

A note about xlog/panic_on_wal_error test. To check that replication
stops if some xlogs are missing, the test first removes xlogs on the
master, then restarts the master, then tries to start the replica
expecting that replication should fail. Well, it shouldn't - the replica
should rebootstrap instead. It didn't rebootstrap before this patch
though, because the master reported wrong garbage collector vclock (as
it didn't recover it on restart). After this patch the replica would
rebootstrap and the test would hang. Fix this by restarting the master
before removing xlog files.
---
 src/box/box.cc                        |  2 +-
 src/box/recovery.cc                   | 14 +++++++++-----
 src/box/recovery.h                    |  5 ++++-
 test/replication/gc_no_space.result   | 15 +++++++++++----
 test/replication/gc_no_space.test.lua |  7 +++++--
 test/xlog/panic_on_wal_error.result   |  2 +-
 test/xlog/panic_on_wal_error.test.lua |  2 +-
 7 files changed, 32 insertions(+), 15 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 72788f82..a5e1f07a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1918,7 +1918,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 * so we must reflect this in replicaset vclock to
 	 * not attempt to apply these rows twice.
 	 */
-	recovery_scan(recovery, &replicaset.vclock);
+	recovery_scan(recovery, &replicaset.vclock, &gc.vclock);
 
 	if (wal_dir_lock >= 0) {
 		box_listen();
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index 77357f04..64d50989 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -117,21 +117,25 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 }
 
 void
-recovery_scan(struct recovery *r, struct vclock *end_vclock)
+recovery_scan(struct recovery *r, struct vclock *end_vclock,
+	      struct vclock *gc_vclock)
 {
 	xdir_scan_xc(&r->wal_dir);
 
-	struct vclock *vclock = vclockset_last(&r->wal_dir.index);
-	if (vclock == NULL || vclock_compare(vclock, &r->vclock) < 0) {
+	if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 ||
+	    vclock_compare(end_vclock, &r->vclock) < 0) {
 		/* No xlogs after last checkpoint. */
+		vclock_copy(gc_vclock, &r->vclock);
 		vclock_copy(end_vclock, &r->vclock);
 		return;
 	}
 
+	if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0)
+		unreachable();
+
 	/* Scan the last xlog to find end vclock. */
-	vclock_copy(end_vclock, vclock);
 	struct xlog_cursor cursor;
-	if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock), &cursor) != 0)
+	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
 		return;
 	struct xrow_header row;
 	while (xlog_cursor_next(&cursor, &row, true) == 0)
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 5882d969..662be3ca 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -72,9 +72,12 @@ recovery_delete(struct recovery *r);
  * Scan the WAL directory, build an index of all found
  * WAL files, then scan the most recent WAL file to find
  * the vclock of the last record (returned in @end_vclock).
+ * @gc_vclock is set to the oldest vclock available in the
+ * WAL directory.
  */
 void
-recovery_scan(struct recovery *r, struct vclock *end_vclock);
+recovery_scan(struct recovery *r,  struct vclock *end_vclock,
+	      struct vclock *gc_vclock);
 
 void
 recovery_follow_local(struct recovery *r, struct xstream *stream,
diff --git a/test/replication/gc_no_space.result b/test/replication/gc_no_space.result
index 5c64bea4..b2d3e207 100644
--- a/test/replication/gc_no_space.result
+++ b/test/replication/gc_no_space.result
@@ -43,9 +43,6 @@ test_run:cmd("setopt delimiter ''");
 ---
 - true
 ...
-default_checkpoint_count = box.cfg.checkpoint_count
----
-...
 box.cfg{checkpoint_count = 2}
 ---
 ...
@@ -264,6 +261,16 @@ box.schema.user.revoke('guest', 'replication')
 test_run:cleanup_cluster()
 ---
 ...
-box.cfg{checkpoint_count = default_checkpoint_count}
+-- Check that the garbage collector vclock is recovered correctly.
+test_run:cmd("restart server default")
+gc = box.info.gc()
+---
+...
+#gc.checkpoints -- 2
 ---
+- 2
+...
+gc.signature == gc.checkpoints[2].signature
+---
+- true
 ...
diff --git a/test/replication/gc_no_space.test.lua b/test/replication/gc_no_space.test.lua
index 7f5ab803..6940996f 100644
--- a/test/replication/gc_no_space.test.lua
+++ b/test/replication/gc_no_space.test.lua
@@ -26,7 +26,6 @@ function check_snap_count(count)
 end;
 test_run:cmd("setopt delimiter ''");
 
-default_checkpoint_count = box.cfg.checkpoint_count
 box.cfg{checkpoint_count = 2}
 
 test_run:cleanup_cluster()
@@ -111,4 +110,8 @@ s:drop()
 box.schema.user.revoke('guest', 'replication')
 test_run:cleanup_cluster()
 
-box.cfg{checkpoint_count = default_checkpoint_count}
+-- Check that the garbage collector vclock is recovered correctly.
+test_run:cmd("restart server default")
+gc = box.info.gc()
+#gc.checkpoints -- 2
+gc.signature == gc.checkpoints[2].signature
diff --git a/test/xlog/panic_on_wal_error.result b/test/xlog/panic_on_wal_error.result
index 345534ba..22f14f91 100644
--- a/test/xlog/panic_on_wal_error.result
+++ b/test/xlog/panic_on_wal_error.result
@@ -67,6 +67,7 @@ test_run:cmd("stop server replica")
 ---
 - true
 ...
+test_run:cmd("restart server default")
 box.space.test:auto_increment{'after snapshot'}
 ---
 - [2, 'after snapshot']
@@ -94,7 +95,6 @@ files = fio.glob(glob)
 for _, file in pairs(files) do fio.unlink(file) end
 ---
 ...
-test_run:cmd("restart server default")
 --
 -- make sure the server has some xlogs, otherwise the
 -- replica doesn't discover the gap in the logs
diff --git a/test/xlog/panic_on_wal_error.test.lua b/test/xlog/panic_on_wal_error.test.lua
index 29410cb2..2e95431c 100644
--- a/test/xlog/panic_on_wal_error.test.lua
+++ b/test/xlog/panic_on_wal_error.test.lua
@@ -31,6 +31,7 @@ box.space.test:select{}
 --
 test_run:cmd("switch default")
 test_run:cmd("stop server replica")
+test_run:cmd("restart server default")
 box.space.test:auto_increment{'after snapshot'}
 box.space.test:auto_increment{'after snapshot - one more row'}
 --
@@ -41,7 +42,6 @@ fio = require('fio')
 glob = fio.pathjoin(box.cfg.wal_dir, '*.xlog')
 files = fio.glob(glob)
 for _, file in pairs(files) do fio.unlink(file) end
-test_run:cmd("restart server default")
 --
 -- make sure the server has some xlogs, otherwise the
 -- replica doesn't discover the gap in the logs
-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 4/9] gc: run garbage collection in background
  2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
                   ` (2 preceding siblings ...)
  2018-11-28 16:14 ` [PATCH 3/9] recovery: restore garbage collector vclock after restart Vladimir Davydov
@ 2018-11-28 16:14 ` Vladimir Davydov
  2018-11-29 16:42   ` [tarantool-patches] " Konstantin Osipov
  2018-11-28 16:14 ` [PATCH 5/9] gc: do not use WAL watcher API for deactivating stale consumers Vladimir Davydov
                   ` (4 subsequent siblings)
  8 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

Currently, garbage collection is executed synchronously by functions
that may trigger it, such as gc_consumer_advance or gc_add_checkpoint.
As a result, one has to be very cautious when using those functions as
they may yield at their will. For example, we can't shoot off stale
consumers right in tx_prio handler - we have to use rather clumsy WAL
watcher interface instead. Besides, in future, when the garbage
collector state is persisted, we will need to call those functions from
on_commit trigger callback, where yielding is not normally allowed.

Actually, there's no reason to remove old files synchronously - we could
as well do it in the background. So this patch introduces a background
garbage collection fiber that executes gc_run when woken up. Now all
functions that might trigger garbage collection wake up this fiber
instead of executing gc_run directly.
---
 src/box/box.cc | 12 +++++++++
 src/box/gc.c   | 79 ++++++++++++++++++++++++++++++++++++++++++++++------------
 src/box/gc.h   | 35 +++++++++++++++++++++-----
 3 files changed, 104 insertions(+), 22 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index a5e1f07a..bb7c1bb9 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2203,6 +2203,18 @@ end:
 
 	latch_unlock(&schema_lock);
 	box_checkpoint_is_in_progress = false;
+
+	/*
+	 * Wait for background garbage collection that might
+	 * have been triggered by this checkpoint to complete.
+	 * Strictly speaking, it isn't necessary, but it
+	 * simplifies testing as it guarantees that by the
+	 * time box.snapshot() returns, all outdated checkpoint
+	 * files have been removed.
+	 */
+	if (!rc)
+		gc_wait();
+
 	return rc;
 }
 
diff --git a/src/box/gc.c b/src/box/gc.c
index 05773b91..9c049977 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -45,8 +45,9 @@
 #include <small/rlist.h>
 
 #include "diag.h"
+#include "fiber.h"
+#include "fiber_cond.h"
 #include "say.h"
-#include "latch.h"
 #include "vclock.h"
 #include "cbus.h"
 #include "engine.h"		/* engine_collect_garbage() */
@@ -54,6 +55,9 @@
 
 struct gc_state gc;
 
+static int
+gc_fiber_f(va_list);
+
 /**
  * Comparator used for ordering gc_consumer objects by signature
  * in a binary tree.
@@ -100,7 +104,13 @@ gc_init(void)
 	vclock_create(&gc.vclock);
 	rlist_create(&gc.checkpoints);
 	gc_tree_new(&gc.consumers);
-	latch_create(&gc.latch);
+	fiber_cond_create(&gc.cond);
+
+	gc.fiber = fiber_new("gc", gc_fiber_f);
+	if (gc.fiber == NULL)
+		panic("failed to start garbage collection fiber");
+
+	fiber_start(gc.fiber);
 }
 
 static void
@@ -202,13 +212,6 @@ gc_run(void)
 		return; /* nothing to do */
 
 	/*
-	 * Engine callbacks may sleep, because they use coio for
-	 * removing files. Make sure we won't try to remove the
-	 * same file multiple times by serializing concurrent gc
-	 * executions.
-	 */
-	latch_lock(&gc.latch);
-	/*
 	 * Run garbage collection.
 	 *
 	 * The order is important here: we must invoke garbage
@@ -220,7 +223,51 @@ gc_run(void)
 		rc = engine_collect_garbage(&checkpoint->vclock);
 	if (run_wal_gc && rc == 0)
 		wal_collect_garbage(vclock);
-	latch_unlock(&gc.latch);
+}
+
+static int
+gc_fiber_f(va_list ap)
+{
+	(void)ap;
+	while (!fiber_is_cancelled()) {
+		int delta = gc.scheduled - gc.completed;
+		if (delta == 0) {
+			/* No pending garbage collection. */
+			fiber_sleep(TIMEOUT_INFINITY);
+			continue;
+		}
+		assert(delta > 0);
+		gc_run();
+		gc.completed += delta;
+		fiber_cond_signal(&gc.cond);
+	}
+	return 0;
+}
+
+/**
+ * Trigger asynchronous garbage collection.
+ */
+static void
+gc_schedule(void)
+{
+	/*
+	 * Do not wake up the background fiber if it's executing
+	 * the garbage collection procedure right now, because
+	 * it may be waiting for a cbus message, which doesn't
+	 * tolerate spurious wakeups. Just increment the counter
+	 * then - it will rerun garbage collection as soon as
+	 * the current round completes.
+	 */
+	if (gc.scheduled++ == gc.completed)
+		fiber_wakeup(gc.fiber);
+}
+
+void
+gc_wait(void)
+{
+	unsigned scheduled = gc.scheduled;
+	while (gc.completed < scheduled)
+		fiber_cond_wait(&gc.cond);
 }
 
 /**
@@ -255,7 +302,7 @@ gc_process_wal_event(struct wal_watcher_msg *msg)
 
 		consumer = next;
 	}
-	gc_run();
+	gc_schedule();
 }
 
 void
@@ -276,7 +323,7 @@ gc_add_checkpoint(const struct vclock *vclock)
 		 * Rerun the garbage collector in this case, just
 		 * in case box.cfg.checkpoint_count has changed.
 		 */
-		gc_run();
+		gc_schedule();
 		return;
 	}
 	assert(last_checkpoint == NULL ||
@@ -295,7 +342,7 @@ gc_add_checkpoint(const struct vclock *vclock)
 	rlist_add_tail_entry(&gc.checkpoints, checkpoint, in_checkpoints);
 	gc.checkpoint_count++;
 
-	gc_run();
+	gc_schedule();
 }
 
 void
@@ -314,7 +361,7 @@ void
 gc_unref_checkpoint(struct gc_checkpoint_ref *ref)
 {
 	rlist_del_entry(ref, in_refs);
-	gc_run();
+	gc_schedule();
 }
 
 struct gc_consumer *
@@ -342,7 +389,7 @@ gc_consumer_unregister(struct gc_consumer *consumer)
 {
 	if (!consumer->is_inactive) {
 		gc_tree_remove(&gc.consumers, consumer);
-		gc_run();
+		gc_schedule();
 	}
 	gc_consumer_delete(consumer);
 }
@@ -376,7 +423,7 @@ gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
 	if (update_tree)
 		gc_tree_insert(&gc.consumers, consumer);
 
-	gc_run();
+	gc_schedule();
 }
 
 struct gc_consumer *
diff --git a/src/box/gc.h b/src/box/gc.h
index eab19ba3..6e96d7bb 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -34,8 +34,8 @@
 #include <stddef.h>
 #include <small/rlist.h>
 
+#include "fiber_cond.h"
 #include "vclock.h"
-#include "latch.h"
 #include "wal.h"
 #include "trivia/util.h"
 
@@ -43,6 +43,7 @@
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+struct fiber;
 struct gc_consumer;
 
 enum { GC_NAME_MAX = 64 };
@@ -122,15 +123,30 @@ struct gc_state {
 	/** Registered consumers, linked by gc_consumer::node. */
 	gc_tree_t consumers;
 	/**
-	 * Latch serializing concurrent invocations of engine
-	 * garbage collection callbacks.
-	 */
-	struct latch latch;
-	/**
 	 * WAL event watcher. Needed to shoot off stale consumers
 	 * when a WAL file is deleted due to ENOSPC.
 	 */
 	struct wal_watcher wal_watcher;
+	/** Fiber that removes old files in the background. */
+	struct fiber *fiber;
+	/**
+	 * Condition variable signaled by the background fiber
+	 * whenever it completes a round of garbage collection.
+	 * Used to wait for garbage collection to complete.
+	 */
+	struct fiber_cond cond;
+	/**
+	 * The following two members are used for scheduling
+	 * background garbage collection and waiting for it to
+	 * complete. To trigger background garbage collection,
+	 * @scheduled is incremented. Whenever a round of garbage
+	 * collection completes, @completed is incremented. Thus
+	 * to wait for background garbage collection scheduled
+	 * at a particular moment of time to complete, one should
+	 * sleep until @completed reaches the value of @scheduled
+	 * taken at that moment of time.
+	 */
+	unsigned completed, scheduled;
 };
 extern struct gc_state gc;
 
@@ -188,6 +204,13 @@ void
 gc_free(void);
 
 /**
+ * Wait for background garbage collection scheduled prior
+ * to this point to complete.
+ */
+void
+gc_wait(void);
+
+/**
  * Update the minimal number of checkpoints to preserve.
  * Called when box.cfg.checkpoint_count is updated.
  *
-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 5/9] gc: do not use WAL watcher API for deactivating stale consumers
  2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
                   ` (3 preceding siblings ...)
  2018-11-28 16:14 ` [PATCH 4/9] gc: run garbage collection in background Vladimir Davydov
@ 2018-11-28 16:14 ` Vladimir Davydov
  2018-11-29 17:02   ` [tarantool-patches] " Konstantin Osipov
  2018-11-28 16:14 ` [PATCH 6/9] wal: simplify watcher API Vladimir Davydov
                   ` (3 subsequent siblings)
  8 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

The WAL thread may delete old WAL files if it gets ENOSPC error.
Currently, we use WAL watcher API to notify the TX thread about it so
that it can shoot off stale replicas. This looks ugly, because WAL
watcher API was initially designed to propagate WAL changes to relay
threads and the new event WAL_EVENT_GC, which was introduced for
notifying about ENOSPC-driven garbage collection, isn't used anywhere
else. Besides, there's already a pipe from WAL to TX - we could reuse it
instead of opening another one.

If we followed down that path, then in order to trigger a checkpoint
from the WAL thread (see #1082), we would have to introduce yet another
esoteric WAL watcher event, making the whole design look even uglier.
That said, let's rewrite the garbage collection notification procedure
using a plane callback instead of abusing WAL watcher API.
---
 src/box/box.cc |  9 +++++++--
 src/box/gc.c   | 33 ++++-----------------------------
 src/box/gc.h   | 13 +++++++------
 src/box/wal.c  | 46 ++++++++++++++++++++++++++++++++--------------
 src/box/wal.h  | 19 +++++++++++--------
 5 files changed, 61 insertions(+), 59 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index bb7c1bb9..20412af4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2011,6 +2011,12 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
 	cbus_process(endpoint);
 }
 
+static void
+on_wal_garbage_collection(const struct vclock *vclock)
+{
+	gc_advance(vclock);
+}
+
 void
 box_init(void)
 {
@@ -2125,10 +2131,9 @@ box_cfg_xc(void)
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
 		     wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
-		     &checkpoint->vclock) != 0) {
+		     &checkpoint->vclock, on_wal_garbage_collection) != 0) {
 		diag_raise();
 	}
-	gc_set_wal_watcher();
 
 	rmean_cleanup(rmean_box);
 
diff --git a/src/box/gc.c b/src/box/gc.c
index 9c049977..87273b8d 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -113,26 +113,6 @@ gc_init(void)
 	fiber_start(gc.fiber);
 }
 
-static void
-gc_process_wal_event(struct wal_watcher_msg *);
-
-void
-gc_set_wal_watcher(void)
-{
-	/*
-	 * Since the function is called from box_cfg() it is
-	 * important that we do not pass a message processing
-	 * callback to wal_set_watcher(). Doing so would cause
-	 * credentials corruption in the fiber executing
-	 * box_cfg() in case it processes some iproto messages.
-	 * Besides, by the time the function is called
-	 * tx_fiber_pool is already set up and it will process
-	 * all the messages directed to "tx" endpoint safely.
-	 */
-	wal_set_watcher(&gc.wal_watcher, "tx", gc_process_wal_event,
-			NULL, WAL_EVENT_GC);
-}
-
 void
 gc_free(void)
 {
@@ -270,25 +250,20 @@ gc_wait(void)
 		fiber_cond_wait(&gc.cond);
 }
 
-/**
- * Deactivate consumers that need files deleted by the WAL thread.
- */
-static void
-gc_process_wal_event(struct wal_watcher_msg *msg)
+void
+gc_advance(const struct vclock *vclock)
 {
-	assert((msg->events & WAL_EVENT_GC) != 0);
-
 	/*
 	 * In case of emergency ENOSPC, the WAL thread may delete
 	 * WAL files needed to restore from backup checkpoints,
 	 * which would be kept by the garbage collector otherwise.
 	 * Bring the garbage collector vclock up to date.
 	 */
-	vclock_copy(&gc.vclock, &msg->gc_vclock);
+	vclock_copy(&gc.vclock, vclock);
 
 	struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
 	while (consumer != NULL &&
-	       vclock_sum(&consumer->vclock) < vclock_sum(&msg->gc_vclock)) {
+	       vclock_sum(&consumer->vclock) < vclock_sum(vclock)) {
 		struct gc_consumer *next = gc_tree_next(&gc.consumers,
 							consumer);
 		assert(!consumer->is_inactive);
diff --git a/src/box/gc.h b/src/box/gc.h
index 6e96d7bb..bcf7d212 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -192,12 +192,6 @@ void
 gc_init(void);
 
 /**
- * Set WAL watcher. Called after WAL is initialized.
- */
-void
-gc_set_wal_watcher(void);
-
-/**
  * Destroy the garbage collection state.
  */
 void
@@ -211,6 +205,13 @@ void
 gc_wait(void);
 
 /**
+ * Advance the garbage collector vclock to the given position.
+ * Deactivate WAL consumers that need older data.
+ */
+void
+gc_advance(const struct vclock *vclock);
+
+/**
  * Update the minimal number of checkpoints to preserve.
  * Called when box.cfg.checkpoint_count is updated.
  *
diff --git a/src/box/wal.c b/src/box/wal.c
index 3a73c5c5..c47535e4 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -91,6 +91,7 @@ struct wal_writer
 {
 	struct journal base;
 	/* ----------------- tx ------------------- */
+	wal_on_garbage_collection_f on_garbage_collection;
 	/**
 	 * The rollback queue. An accumulator for all requests
 	 * that need to be rolled back. Also acts as a valve
@@ -152,6 +153,17 @@ struct wal_msg {
 	 * be rolled back.
 	 */
 	struct stailq rollback;
+	/**
+	 * Set if the WAL thread ran out of disk space while
+	 * processing this request and had to delete some old
+	 * WAL files.
+	 */
+	bool gc_executed;
+	/**
+	 * VClock of the oldest WAL row available on the instance.
+	 * Initialized only if @gc_executed is set.
+	 */
+	struct vclock gc_vclock;
 };
 
 /**
@@ -188,6 +200,7 @@ wal_msg_create(struct wal_msg *batch)
 {
 	cmsg_init(&batch->base, wal_request_route);
 	batch->approx_len = 0;
+	batch->gc_executed = false;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
 }
@@ -254,6 +267,7 @@ tx_schedule_queue(struct stailq *queue)
 static void
 tx_schedule_commit(struct cmsg *msg)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	struct wal_msg *batch = (struct wal_msg *) msg;
 	/*
 	 * Move the rollback list to the writer first, since
@@ -261,11 +275,13 @@ tx_schedule_commit(struct cmsg *msg)
 	 * iteration of tx_schedule_queue loop.
 	 */
 	if (! stailq_empty(&batch->rollback)) {
-		struct wal_writer *writer = &wal_writer_singleton;
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
 	}
 	tx_schedule_queue(&batch->commit);
+
+	if (batch->gc_executed)
+		writer->on_garbage_collection(&batch->gc_vclock);
 }
 
 static void
@@ -296,7 +312,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 		  const char *wal_dirname, int64_t wal_max_rows,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 		  const struct vclock *vclock,
-		  const struct vclock *checkpoint_vclock)
+		  const struct vclock *checkpoint_vclock,
+		  wal_on_garbage_collection_f on_garbage_collection)
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_rows = wal_max_rows;
@@ -315,6 +332,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	vclock_copy(&writer->vclock, vclock);
 	vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
 	rlist_create(&writer->watchers);
+
+	writer->on_garbage_collection = on_garbage_collection;
 }
 
 /** Destroy a WAL writer structure. */
@@ -419,14 +438,15 @@ wal_open(struct wal_writer *writer)
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *checkpoint_vclock)
+	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
+	 wal_on_garbage_collection_f on_garbage_collection)
 {
 	assert(wal_max_rows > 1);
 
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
 			  wal_max_size, instance_uuid, vclock,
-			  checkpoint_vclock);
+			  checkpoint_vclock, on_garbage_collection);
 
 	/*
 	 * Scan the WAL directory to build an index of all
@@ -664,14 +684,14 @@ wal_opt_rotate(struct wal_writer *writer)
 }
 
 /**
- * Make sure there's enough disk space to append @len bytes
- * of data to the current WAL.
+ * Make sure there's enough disk space to process the given
+ * WAL message.
  *
  * If fallocate() fails with ENOSPC, delete old WAL files
  * that are not needed for recovery and retry.
  */
 static int
-wal_fallocate(struct wal_writer *writer, size_t len)
+wal_fallocate(struct wal_writer *writer, struct wal_msg *msg)
 {
 	bool warn_no_space = true;
 	struct xlog *l = &writer->current_wal;
@@ -688,7 +708,7 @@ wal_fallocate(struct wal_writer *writer, size_t len)
 	 * of encoded rows (compression, fixheaders). Double the
 	 * given length to get a rough upper bound estimate.
 	 */
-	len *= 2;
+	size_t len = msg->approx_len * 2;
 
 retry:
 	if (errinj == NULL || errinj->iparam == 0) {
@@ -722,7 +742,9 @@ retry:
 	}
 	diag_destroy(&diag);
 
-	wal_notify_watchers(writer, WAL_EVENT_GC);
+	msg->gc_executed = true;
+	if (xdir_first_vclock(&writer->wal_dir, &msg->gc_vclock) < 0)
+		vclock_copy(&msg->gc_vclock, &writer->vclock);
 	goto retry;
 error:
 	diag_log();
@@ -816,7 +838,7 @@ wal_write_to_disk(struct cmsg *msg)
 	}
 
 	/* Ensure there's enough disk space before writing anything. */
-	if (wal_fallocate(writer, wal_msg->approx_len) != 0) {
+	if (wal_fallocate(writer, wal_msg) != 0) {
 		stailq_concat(&wal_msg->rollback, &wal_msg->commit);
 		return wal_writer_begin_rollback(writer);
 	}
@@ -1115,7 +1137,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	assert(!rlist_empty(&watcher->next));
 
 	struct wal_watcher_msg *msg = &watcher->msg;
-	struct wal_writer *writer = &wal_writer_singleton;
 
 	events &= watcher->event_mask;
 	if (events == 0) {
@@ -1134,9 +1155,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	}
 
 	msg->events = events;
-	if (xdir_first_vclock(&writer->wal_dir, &msg->gc_vclock) < 0)
-		vclock_copy(&msg->gc_vclock, &writer->vclock);
-
 	cmsg_init(&msg->cmsg, watcher->route);
 	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
 }
diff --git a/src/box/wal.h b/src/box/wal.h
index 5f3a66ce..e5079552 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -52,13 +52,23 @@ extern int wal_dir_lock;
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+/**
+ * Callback invoked on behalf of the tx thread upon request
+ * completion if the WAL thread ran out of disk space while
+ * performing a request and had to delete some old WAL files
+ * in order to continue. The vclock of the oldest WAL row
+ * still stored on the instance is returned in @vclock.
+ */
+typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
+
 void
 wal_thread_start();
 
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *checkpoint_vclock);
+	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
+	 wal_on_garbage_collection_f on_garbage_collection);
 
 void
 wal_thread_stop();
@@ -73,8 +83,6 @@ struct wal_watcher_msg {
 	struct wal_watcher *watcher;
 	/** Bit mask of events, see wal_event. */
 	unsigned events;
-	/** VClock of the oldest stored WAL row. */
-	struct vclock gc_vclock;
 };
 
 enum wal_event {
@@ -82,11 +90,6 @@ enum wal_event {
 	WAL_EVENT_WRITE		= (1 << 0),
 	/** A new WAL is created. */
 	WAL_EVENT_ROTATE	= (1 << 1),
-	/**
-	 * The WAL thread ran out of disk space and had to delete
-	 * one or more old WAL files.
-	 **/
-	WAL_EVENT_GC		= (1 << 2),
 };
 
 struct wal_watcher {
-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 6/9] wal: simplify watcher API
  2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
                   ` (4 preceding siblings ...)
  2018-11-28 16:14 ` [PATCH 5/9] gc: do not use WAL watcher API for deactivating stale consumers Vladimir Davydov
@ 2018-11-28 16:14 ` Vladimir Davydov
  2018-11-29 17:33   ` [tarantool-patches] " Konstantin Osipov
  2018-11-28 16:14 ` [PATCH 7/9] box: rewrite checkpoint daemon in C Vladimir Davydov
                   ` (2 subsequent siblings)
  8 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

This patch reverts changes done in order to make WAL watcher API
suitable for notiying TX about WAL garbage collection triggered on
ENOSPC, namely:

 b073b0176704 wal: add event_mask to wal_watcher
 7077341ec5b3 wal: pass wal_watcher_msg to wal_watcher callback

We don't need them anymore, because now we piggyback the notification
on the WAL request message that triggered ENOSPC.
---
 src/box/relay.cc | 12 ++++--------
 src/box/wal.c    | 27 ++++++---------------------
 src/box/wal.h    | 25 ++++++-------------------
 3 files changed, 16 insertions(+), 48 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 0034f99a..0a1e95af 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -406,12 +406,9 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
 }
 
 static void
-relay_process_wal_event(struct wal_watcher_msg *msg)
+relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 {
-	assert((msg->events & (WAL_EVENT_WRITE | WAL_EVENT_ROTATE)) != 0);
-
-	struct relay *relay = container_of(msg->watcher, struct relay,
-					   wal_watcher);
+	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
 	if (relay->state != RELAY_FOLLOW) {
 		/*
 		 * Do not try to send anything to the replica
@@ -421,7 +418,7 @@ relay_process_wal_event(struct wal_watcher_msg *msg)
 	}
 	try {
 		recover_remaining_wals(relay->r, &relay->stream, NULL,
-				       (msg->events & WAL_EVENT_ROTATE) != 0);
+				       (events & WAL_EVENT_ROTATE) != 0);
 	} catch (Exception *e) {
 		e->log();
 		diag_move(diag_get(), &relay->diag);
@@ -507,8 +504,7 @@ relay_subscribe_f(va_list ap)
 	};
 	trigger_add(&r->on_close_log, &on_close_log);
 	wal_set_watcher(&relay->wal_watcher, cord_name(cord()),
-			relay_process_wal_event, cbus_process,
-			WAL_EVENT_WRITE | WAL_EVENT_ROTATE);
+			relay_process_wal_event, cbus_process);
 
 	relay_set_cord_name(relay->io.fd);
 
diff --git a/src/box/wal.c b/src/box/wal.c
index c47535e4..31385642 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -1137,13 +1137,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	assert(!rlist_empty(&watcher->next));
 
 	struct wal_watcher_msg *msg = &watcher->msg;
-
-	events &= watcher->event_mask;
-	if (events == 0) {
-		/* The watcher isn't interested in this event. */
-		return;
-	}
-
 	if (msg->cmsg.route != NULL) {
 		/*
 		 * If the notification message is still en route,
@@ -1163,7 +1156,10 @@ static void
 wal_watcher_notify_perform(struct cmsg *cmsg)
 {
 	struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg;
-	msg->watcher->cb(msg);
+	struct wal_watcher *watcher = msg->watcher;
+	unsigned events = msg->events;
+
+	watcher->cb(watcher, events);
 }
 
 static void
@@ -1216,9 +1212,8 @@ wal_watcher_detach(void *arg)
 
 void
 wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher_msg *),
-		void (*process_cb)(struct cbus_endpoint *),
-		unsigned event_mask)
+		void (*watcher_cb)(struct wal_watcher *, unsigned events),
+		void (*process_cb)(struct cbus_endpoint *))
 {
 	assert(journal_is_initialized(&wal_writer_singleton.base));
 
@@ -1228,7 +1223,6 @@ wal_set_watcher(struct wal_watcher *watcher, const char *name,
 	watcher->msg.events = 0;
 	watcher->msg.cmsg.route = NULL;
 	watcher->pending_events = 0;
-	watcher->event_mask = event_mask;
 
 	assert(lengthof(watcher->route) == 2);
 	watcher->route[0] = (struct cmsg_hop)
@@ -1249,15 +1243,6 @@ wal_clear_watcher(struct wal_watcher *watcher,
 		    wal_watcher_detach, watcher, process_cb);
 }
 
-/**
- * Notify all interested watchers about a WAL event.
- *
- * XXX: Note, this function iterates over all registered watchers,
- * including those that are not interested in the given event.
- * This is OK only as long as the number of events/watchers is
- * small. If this ever changes, we should consider maintaining
- * a separate watcher list per each event type.
- */
 static void
 wal_notify_watchers(struct wal_writer *writer, unsigned events)
 {
diff --git a/src/box/wal.h b/src/box/wal.h
index e5079552..1e070625 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -73,15 +73,9 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 void
 wal_thread_stop();
 
-/**
- * A notification message sent from the WAL to a watcher
- * when a WAL event occurs.
- */
 struct wal_watcher_msg {
 	struct cmsg cmsg;
-	/** Pointer to the watcher this message is for. */
 	struct wal_watcher *watcher;
-	/** Bit mask of events, see wal_event. */
 	unsigned events;
 };
 
@@ -96,7 +90,7 @@ struct wal_watcher {
 	/** Link in wal_writer::watchers. */
 	struct rlist next;
 	/** The watcher callback function. */
-	void (*cb)(struct wal_watcher_msg *);
+	void (*cb)(struct wal_watcher *, unsigned events);
 	/** Pipe from the watcher to WAL. */
 	struct cpipe wal_pipe;
 	/** Pipe from WAL to the watcher. */
@@ -106,11 +100,6 @@ struct wal_watcher {
 	/** Message sent to notify the watcher. */
 	struct wal_watcher_msg msg;
 	/**
-	 * Bit mask of WAL events that this watcher is
-	 * interested in.
-	 */
-	unsigned event_mask;
-	/**
 	 * Bit mask of WAL events that happened while
 	 * the notification message was en route.
 	 * It indicates that the message must be resend
@@ -135,19 +124,17 @@ struct wal_watcher {
  * @param watcher     WAL watcher to register.
  * @param name        Name of the cbus endpoint at the caller's cord.
  * @param watcher_cb  Callback to invoke from the caller's cord
- *                    upon receiving a WAL event. It takes an object
- *                    of type wal_watcher_msg that stores a pointer
- *                    to the watcher and information about the event.
+ *                    upon receiving a WAL event. Apart from the
+ *                    watcher itself, it takes a bit mask of events.
+ *                    Events are described in wal_event enum.
  * @param process_cb  Function called to process cbus messages
  *                    while the watcher is being attached or NULL
  *                    if the cbus loop is running elsewhere.
- * @param event_mask  Bit mask of events the watcher is interested in.
  */
 void
 wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher_msg *),
-		void (*process_cb)(struct cbus_endpoint *),
-		unsigned event_mask);
+		void (*watcher_cb)(struct wal_watcher *, unsigned events),
+		void (*process_cb)(struct cbus_endpoint *));
 
 /**
  * Unsubscribe from WAL events.
-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 7/9] box: rewrite checkpoint daemon in C
  2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
                   ` (5 preceding siblings ...)
  2018-11-28 16:14 ` [PATCH 6/9] wal: simplify watcher API Vladimir Davydov
@ 2018-11-28 16:14 ` Vladimir Davydov
  2018-11-30  8:58   ` [tarantool-patches] " Konstantin Osipov
  2018-11-28 16:14 ` [PATCH 8/9] wal: pass struct instead of vclock to checkpoint methods Vladimir Davydov
  2018-11-28 16:14 ` [PATCH 9/9] wal: trigger checkpoint if there are too many WALs Vladimir Davydov
  8 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

Long time ago, when the checkpoint daemon was added to Tarantool, it was
responsible not only for making periodic checkpoints, but also for
maintaining the configured number of checkpoints and removing old snap
and xlog times, so it was much easier to implement it in Lua than in C.
However, over time, all its responsibilities have been reimplemented in
C and moved to the server code so that now it just calls box.snapshot()
periodically. Let's rewrite this simple procedure in C as well - this
will allow us to easily add more complex logic there, e.g. triggering
checkpoint when WAL files exceed a configured threshold.
---
 src/box/CMakeLists.txt               |   1 -
 src/box/box.cc                       | 102 ++++++++++++++++++++++++
 src/box/box.h                        |   1 +
 src/box/lua/cfg.cc                   |  12 +++
 src/box/lua/checkpoint_daemon.lua    | 136 --------------------------------
 src/box/lua/init.c                   |   2 -
 src/box/lua/load_cfg.lua             |   2 +-
 test/xlog/checkpoint_daemon.result   | 145 -----------------------------------
 test/xlog/checkpoint_daemon.test.lua |  56 --------------
 9 files changed, 116 insertions(+), 341 deletions(-)
 delete mode 100644 src/box/lua/checkpoint_daemon.lua

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index d1276472..b2aaa1c3 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -7,7 +7,6 @@ lua_source(lua_sources lua/load_cfg.lua)
 lua_source(lua_sources lua/schema.lua)
 lua_source(lua_sources lua/tuple.lua)
 lua_source(lua_sources lua/session.lua)
-lua_source(lua_sources lua/checkpoint_daemon.lua)
 lua_source(lua_sources lua/feedback_daemon.lua)
 lua_source(lua_sources lua/net_box.lua)
 lua_source(lua_sources lua/upgrade.lua)
diff --git a/src/box/box.cc b/src/box/box.cc
index 20412af4..7cb96cd6 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -91,6 +91,22 @@ bool box_checkpoint_is_in_progress = false;
 const struct vclock *box_vclock = &replicaset.vclock;
 
 /**
+ * Fiber that performs periodic checkpointing.
+ */
+static struct fiber *checkpoint_daemon;
+
+/**
+ * Interval between checkpoints, in seconds.
+ */
+static double checkpoint_interval;
+
+/**
+ * Time of the next scheduled checkpoint that will be
+ * performed by the checkpoint daemon.
+ */
+static double next_checkpoint_time;
+
+/**
  * Set if backup is in progress, i.e. box_backup_start() was
  * called but box_backup_stop() hasn't been yet.
  */
@@ -361,6 +377,60 @@ apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
 	space_apply_initial_join_row_xc(space, &request);
 }
 
+static int
+checkpoint_daemon_f(va_list ap)
+{
+	(void)ap;
+	assert(checkpoint_daemon == fiber());
+	while (!fiber_is_cancelled()) {
+		double now = ev_monotonic_now(loop());
+		if (now < next_checkpoint_time) {
+			fiber_sleep(next_checkpoint_time - now);
+			continue;
+		}
+		if (box_checkpoint_is_in_progress) {
+			/*
+			 * The next checkpoint will be scheduled
+			 * by the concurrent box_checkpoint().
+			 */
+			next_checkpoint_time = now + TIMEOUT_INFINITY;
+			continue;
+		}
+		box_checkpoint();
+	}
+	checkpoint_daemon = NULL;
+	return 0;
+}
+
+static void
+start_checkpoint_daemon(void)
+{
+	assert(checkpoint_daemon == NULL);
+	checkpoint_daemon = fiber_new("checkpoint_daemon", checkpoint_daemon_f);
+	if (checkpoint_daemon == NULL)
+		panic("failed to start checkpoint daemon");
+	next_checkpoint_time = ev_monotonic_now(loop()) + TIMEOUT_INFINITY;
+	fiber_wakeup(checkpoint_daemon);
+}
+
+static void
+schedule_next_checkpoint(double timeout)
+{
+	if (checkpoint_daemon == NULL)
+		return;
+
+	next_checkpoint_time = ev_monotonic_now(loop()) + timeout;
+	if (checkpoint_daemon != fiber())
+		fiber_wakeup(checkpoint_daemon);
+
+	char buf[128];
+	struct tm tm;
+	time_t time = (time_t)ev_now(loop()) + timeout;
+	localtime_r(&time, &tm);
+	strftime(buf, sizeof(buf), "%c", &tm);
+	say_info("scheduled next checkpoint for %s", buf);
+}
+
 /* {{{ configuration bindings */
 
 static void
@@ -844,6 +914,30 @@ box_set_readahead(void)
 }
 
 void
+box_set_checkpoint_interval(void)
+{
+	checkpoint_interval = cfg_getd("checkpoint_interval");
+	if (checkpoint_interval > 0) {
+		/*
+		 * Add a random offset to the initial period to avoid
+		 * simultaneous checkpointing when multiple instances
+		 * are running on the same host.
+		 */
+		double timeout = checkpoint_interval +
+				rand() % ((int)checkpoint_interval + 1);
+		schedule_next_checkpoint(timeout);
+	} else {
+		/*
+		 * Effectively disable periodic checkpointing by
+		 * setting the next checkpoint time to a very large
+		 * value.
+		 */
+		next_checkpoint_time = ev_monotonic_now(loop()) +
+						TIMEOUT_INFINITY;
+	}
+}
+
+void
 box_set_checkpoint_count(void)
 {
 	int checkpoint_count = cfg_geti("checkpoint_count");
@@ -2141,6 +2235,7 @@ box_cfg_xc(void)
 	replicaset_follow();
 
 	sql_load_schema();
+	start_checkpoint_daemon();
 
 	fiber_gc();
 	is_box_configured = true;
@@ -2210,6 +2305,13 @@ end:
 	box_checkpoint_is_in_progress = false;
 
 	/*
+	 * Schedule the next checkpoint if periodic checkpointing
+	 * is configured.
+	 */
+	if (checkpoint_interval > 0)
+		schedule_next_checkpoint(checkpoint_interval);
+
+	/*
 	 * Wait for background garbage collection that might
 	 * have been triggered by this checkpoint to complete.
 	 * Strictly speaking, it isn't necessary, but it
diff --git a/src/box/box.h b/src/box/box.h
index 7712c192..9bde583d 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -199,6 +199,7 @@ void box_set_snap_io_rate_limit(void);
 void box_set_too_long_threshold(void);
 void box_set_readahead(void);
 void box_set_checkpoint_count(void);
+void box_set_checkpoint_interval(void);
 void box_set_memtx_memory(void);
 void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_memory(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index c3825591..4f08c78e 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -165,6 +165,17 @@ lbox_cfg_set_checkpoint_count(struct lua_State *L)
 }
 
 static int
+lbox_cfg_set_checkpoint_interval(struct lua_State *L)
+{
+	try {
+		box_set_checkpoint_interval();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
+static int
 lbox_cfg_set_read_only(struct lua_State *L)
 {
 	try {
@@ -340,6 +351,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_too_long_threshold", lbox_cfg_set_too_long_threshold},
 		{"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit},
 		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
+		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
 		{"cfg_set_read_only", lbox_cfg_set_read_only},
 		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
 		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/checkpoint_daemon.lua b/src/box/lua/checkpoint_daemon.lua
deleted file mode 100644
index 576c4a5c..00000000
--- a/src/box/lua/checkpoint_daemon.lua
+++ /dev/null
@@ -1,136 +0,0 @@
--- checkpoint_daemon.lua (internal file)
-
-local log = require 'log'
-local fiber = require 'fiber'
-local fio = require 'fio'
-local yaml = require 'yaml'
-local errno = require 'errno'
-local digest = require 'digest'
-local pickle = require 'pickle'
-
-local PREFIX = 'checkpoint_daemon'
-
-local daemon = {
-    checkpoint_interval = 0;
-    fiber = nil;
-    control = nil;
-}
-
--- create snapshot, return true if no errors
-local function snapshot()
-    log.info("making snapshot...")
-    local s, e = pcall(function() box.snapshot() end)
-    if s then
-        return true
-    end
-    -- don't complain in the log if the snapshot already exists
-    if errno() == errno.EEXIST then
-        return false
-    end
-    log.error("error while creating snapshot: %s", e)
-    return false
-end
-
--- check filesystem and current time
-local function process(self)
-
-    if daemon.checkpoint_interval == nil then
-        return false
-    end
-
-    if not(daemon.checkpoint_interval > 0) then
-        return false
-    end
-
-    local checkpoints = box.info.gc().checkpoints
-    local last_checkpoint = checkpoints[#checkpoints]
-
-    local last_snap = fio.pathjoin(box.cfg.memtx_dir,
-            string.format('%020d.snap', last_checkpoint.signature))
-    local snstat = fio.stat(last_snap)
-    if snstat == nil then
-        log.error("can't stat %s: %s", last_snap, errno.strerror())
-        return false
-    end
-    if snstat.mtime + daemon.checkpoint_interval <= fiber.time() then
-        return snapshot()
-    end
-end
-
-local function daemon_fiber(self)
-    fiber.name(PREFIX, {truncate = true})
-    log.info("started")
-
-    --
-    -- Add random offset to the initial period to avoid simultaneous
-    -- snapshotting when multiple instances of tarantool are running
-    -- on the same host.
-    -- See https://github.com/tarantool/tarantool/issues/732
-    --
-    local random = pickle.unpack('i', digest.urandom(4))
-    local offset = random % self.checkpoint_interval
-    while true do
-        local period = self.checkpoint_interval + offset
-        -- maintain next_snapshot_time as a self member for testing purposes
-        self.next_snapshot_time = fiber.time() + period
-        log.info("scheduled the next snapshot at %s",
-                os.date("%c", self.next_snapshot_time))
-        local msg = self.control:get(period)
-        if msg == 'shutdown' then
-            break
-        elseif msg == 'reload' then
-            offset = random % self.checkpoint_interval
-            log.info("reloaded") -- continue
-        elseif msg == nil and box.info.status == 'running' then
-            local s, e = pcall(process, self)
-            if not s then
-                log.error(e)
-            end
-            offset = 0
-        end
-    end
-    self.next_snapshot_time = nil
-    log.info("stopped")
-end
-
-local function reload(self)
-    if self.checkpoint_interval > 0 then
-        if self.control == nil then
-            -- Start daemon
-            self.control = fiber.channel()
-            self.fiber = fiber.create(daemon_fiber, self)
-            fiber.sleep(0)
-        else
-            -- Reload daemon
-            self.control:put("reload")
-            --
-            -- channel:put() doesn't block the writer if there
-            -- is a ready reader. Give daemon fiber way so that
-            -- it can execute before reload() returns to the caller.
-            --
-            fiber.sleep(0)
-        end
-    elseif self.control ~= nil then
-        -- Shutdown daemon
-        self.control:put("shutdown")
-        self.fiber = nil
-        self.control = nil
-        fiber.sleep(0) -- see comment above
-    end
-end
-
-setmetatable(daemon, {
-    __index = {
-        set_checkpoint_interval = function()
-            daemon.checkpoint_interval = box.cfg.checkpoint_interval
-            reload(daemon)
-            return
-        end,
-    }
-})
-
-if box.internal == nil then
-    box.internal = { [PREFIX] = daemon }
-else
-    box.internal[PREFIX] = daemon
-end
diff --git a/src/box/lua/init.c b/src/box/lua/init.c
index ccb4c6a4..0e90f6be 100644
--- a/src/box/lua/init.c
+++ b/src/box/lua/init.c
@@ -65,7 +65,6 @@ extern char session_lua[],
 	schema_lua[],
 	load_cfg_lua[],
 	xlog_lua[],
-	checkpoint_daemon_lua[],
 	feedback_daemon_lua[],
 	net_box_lua[],
 	upgrade_lua[],
@@ -75,7 +74,6 @@ static const char *lua_sources[] = {
 	"box/session", session_lua,
 	"box/tuple", tuple_lua,
 	"box/schema", schema_lua,
-	"box/checkpoint_daemon", checkpoint_daemon_lua,
 	"box/feedback_daemon", feedback_daemon_lua,
 	"box/upgrade", upgrade_lua,
 	"box/net_box", net_box_lua,
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index fd99206f..a9e6fe07 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -227,7 +227,7 @@ local dynamic_cfg = {
     vinyl_cache             = private.cfg_set_vinyl_cache,
     vinyl_timeout           = private.cfg_set_vinyl_timeout,
     checkpoint_count        = private.cfg_set_checkpoint_count,
-    checkpoint_interval     = private.checkpoint_daemon.set_checkpoint_interval,
+    checkpoint_interval     = private.cfg_set_checkpoint_interval,
     worker_pool_threads     = private.cfg_set_worker_pool_threads,
     feedback_enabled        = private.feedback_daemon.set_feedback_params,
     feedback_host           = private.feedback_daemon.set_feedback_params,
diff --git a/test/xlog/checkpoint_daemon.result b/test/xlog/checkpoint_daemon.result
index 3a75137d..f1d8690a 100644
--- a/test/xlog/checkpoint_daemon.result
+++ b/test/xlog/checkpoint_daemon.result
@@ -151,148 +151,3 @@ box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 }
 space:drop()
 ---
 ...
-daemon = box.internal.checkpoint_daemon
----
-...
--- stop daemon
-box.cfg{ checkpoint_interval = 0 }
----
-...
--- wait daemon to stop
-while daemon.fiber ~= nil do fiber.sleep(0) end
----
-...
-daemon.fiber == nil
----
-- true
-...
--- start daemon
-box.cfg{ checkpoint_interval = 10 }
----
-...
-daemon.fiber ~= nil
----
-- true
-...
--- reload configuration
-box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 }
----
-...
-daemon.checkpoint_interval == 15
----
-- true
-...
-daemon.checkpoint_count = 20
----
-...
--- Check that checkpoint_count can't be < 1.
-box.cfg{ checkpoint_count = 1 }
----
-...
-box.cfg{ checkpoint_count = 0 }
----
-- error: 'Incorrect value for option ''checkpoint_count'': the value must not be less
-    than one'
-...
-box.cfg.checkpoint_count
----
-- 1
-...
--- Start
-PERIOD = 3600
----
-...
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
----
-...
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
----
-...
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-daemon_fiber = daemon.fiber
----
-...
-daemon_control = daemon.control
----
-...
--- Reload #1
-PERIOD = 100
----
-...
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
----
-...
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
----
-...
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-daemon.fiber == daemon_fiber
----
-- true
-...
-daemon.control == daemon_control
----
-- true
-...
--- Reload #2
-PERIOD = 1000
----
-...
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
----
-...
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
----
-...
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-daemon.fiber == daemon_fiber
----
-- true
-...
-daemon.control == daemon_control
----
-- true
-...
-daemon_control = nil
----
-...
-daemin_fiber = nil
----
-...
--- Shutdown
-box.cfg{ checkpoint_count = 2, checkpoint_interval = 0}
----
-...
-daemon.next_snapshot_time
----
-- null
-...
-daemon.fiber == nil
----
-- true
-...
-daemon.control == nil
----
-- true
-...
diff --git a/test/xlog/checkpoint_daemon.test.lua b/test/xlog/checkpoint_daemon.test.lua
index f3490621..b67879c4 100644
--- a/test/xlog/checkpoint_daemon.test.lua
+++ b/test/xlog/checkpoint_daemon.test.lua
@@ -88,59 +88,3 @@ test_run:cmd("setopt delimiter ''");
 -- restore default options
 box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 }
 space:drop()
-
-daemon = box.internal.checkpoint_daemon
--- stop daemon
-box.cfg{ checkpoint_interval = 0 }
--- wait daemon to stop
-while daemon.fiber ~= nil do fiber.sleep(0) end
-daemon.fiber == nil
--- start daemon
-box.cfg{ checkpoint_interval = 10 }
-daemon.fiber ~= nil
--- reload configuration
-box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 }
-daemon.checkpoint_interval == 15
-daemon.checkpoint_count = 20
-
--- Check that checkpoint_count can't be < 1.
-box.cfg{ checkpoint_count = 1 }
-box.cfg{ checkpoint_count = 0 }
-box.cfg.checkpoint_count
-
--- Start
-PERIOD = 3600
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
-
-daemon_fiber = daemon.fiber
-daemon_control = daemon.control
-
--- Reload #1
-PERIOD = 100
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
-daemon.fiber == daemon_fiber
-daemon.control == daemon_control
-
--- Reload #2
-PERIOD = 1000
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
-daemon.fiber == daemon_fiber
-daemon.control == daemon_control
-
-daemon_control = nil
-daemin_fiber = nil
-
--- Shutdown
-box.cfg{ checkpoint_count = 2, checkpoint_interval = 0}
-daemon.next_snapshot_time
-daemon.fiber == nil
-daemon.control == nil
-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 8/9] wal: pass struct instead of vclock to checkpoint methods
  2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
                   ` (6 preceding siblings ...)
  2018-11-28 16:14 ` [PATCH 7/9] box: rewrite checkpoint daemon in C Vladimir Davydov
@ 2018-11-28 16:14 ` Vladimir Davydov
  2018-11-30  9:00   ` [tarantool-patches] " Konstantin Osipov
  2018-11-28 16:14 ` [PATCH 9/9] wal: trigger checkpoint if there are too many WALs Vladimir Davydov
  8 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

Currently, we only need to pass a vclock between TX and WAL during
checkpointing. However, in order to implement auto-checkpointing
triggered when WAL size exceeds a certain threshold, we will need to
pass some extra info so that we can properly reset the counter
accounting the WAL size in the WAL thread. To make it possible,let's
move wal_checkpoint struct, which is used internally by WAL to pass a
checkpoint vclock, to the header and require the caller to pass it to
wal_begin/commit_checkpoint instead of just a vclock.
---
 src/box/box.cc | 10 +++++-----
 src/box/wal.c  | 22 ++++++----------------
 src/box/wal.h  | 20 +++++++++++++++-----
 3 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 7cb96cd6..24ddd941 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2288,15 +2288,15 @@ box_checkpoint()
 	if ((rc = engine_begin_checkpoint()))
 		goto end;
 
-	struct vclock vclock;
-	if ((rc = wal_begin_checkpoint(&vclock)))
+	struct wal_checkpoint checkpoint;
+	if ((rc = wal_begin_checkpoint(&checkpoint)))
 		goto end;
 
-	if ((rc = engine_commit_checkpoint(&vclock)))
+	if ((rc = engine_commit_checkpoint(&checkpoint.vclock)))
 		goto end;
 
-	wal_commit_checkpoint(&vclock);
-	gc_add_checkpoint(&vclock);
+	wal_commit_checkpoint(&checkpoint);
+	gc_add_checkpoint(&checkpoint.vclock);
 end:
 	if (rc)
 		engine_abort_checkpoint();
diff --git a/src/box/wal.c b/src/box/wal.c
index 31385642..bbb8376c 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -490,12 +490,6 @@ wal_flush(void)
 	cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL);
 }
 
-struct wal_checkpoint
-{
-	struct cbus_call_msg base;
-	struct vclock vclock;
-};
-
 static int
 wal_begin_checkpoint_f(struct cbus_call_msg *data)
 {
@@ -527,11 +521,11 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data)
 }
 
 int
-wal_begin_checkpoint(struct vclock *vclock)
+wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->wal_mode == WAL_NONE) {
-		vclock_copy(vclock, &writer->vclock);
+		vclock_copy(&checkpoint->vclock, &writer->vclock);
 		return 0;
 	}
 	if (!stailq_empty(&writer->rollback)) {
@@ -545,15 +539,13 @@ wal_begin_checkpoint(struct vclock *vclock)
 		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
 		return -1;
 	}
-	struct wal_checkpoint msg;
 	bool cancellable = fiber_set_cancellable(false);
 	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
-			   &msg.base, wal_begin_checkpoint_f, NULL,
+			   &checkpoint->base, wal_begin_checkpoint_f, NULL,
 			   TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 	if (rc != 0)
 		return -1;
-	vclock_copy(vclock, &msg.vclock);
 	return 0;
 }
 
@@ -567,18 +559,16 @@ wal_commit_checkpoint_f(struct cbus_call_msg *data)
 }
 
 void
-wal_commit_checkpoint(const struct vclock *vclock)
+wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->wal_mode == WAL_NONE) {
-		vclock_copy(&writer->checkpoint_vclock, vclock);
+		vclock_copy(&writer->checkpoint_vclock, &checkpoint->vclock);
 		return;
 	}
-	struct wal_checkpoint msg;
-	vclock_copy(&msg.vclock, vclock);
 	bool cancellable = fiber_set_cancellable(false);
 	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
-		  &msg.base, wal_commit_checkpoint_f, NULL,
+		  &checkpoint->base, wal_commit_checkpoint_f, NULL,
 		  TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 }
diff --git a/src/box/wal.h b/src/box/wal.h
index 1e070625..3f0fae07 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -160,16 +160,26 @@ wal_mode();
 void
 wal_flush(void);
 
+struct wal_checkpoint {
+	struct cbus_call_msg base;
+	/**
+	 * VClock of the last record written to the rotated WAL.
+	 * This is the vclock that is supposed to be used to
+	 * identify the new checkpoint.
+	 */
+	struct vclock vclock;
+};
+
 /**
  * Prepare WAL for checkpointing.
  *
  * This function flushes all pending changes and rotates the
- * current WAL. The vclock of the last record written to the
- * rotated WAL is returned in @vclock. This is the vclock that
- * is supposed to be used to identify the new checkpoint.
+ * current WAL. Checkpoint info is returned in @checkpoint.
+ * It is supposed to be passed to wal_commit_checkpoint()
+ * upon successful checkpoint creation.
  */
 int
-wal_begin_checkpoint(struct vclock *vclock);
+wal_begin_checkpoint(struct wal_checkpoint *checkpoint);
 
 /**
  * This function is called upon successful checkpoint creation.
@@ -177,7 +187,7 @@ wal_begin_checkpoint(struct vclock *vclock);
  * vclock.
  */
 void
-wal_commit_checkpoint(const struct vclock *vclock);
+wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 
 /**
  * Remove WAL files that are not needed by consumers reading
-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [PATCH 9/9] wal: trigger checkpoint if there are too many WALs
  2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
                   ` (7 preceding siblings ...)
  2018-11-28 16:14 ` [PATCH 8/9] wal: pass struct instead of vclock to checkpoint methods Vladimir Davydov
@ 2018-11-28 16:14 ` Vladimir Davydov
  2018-12-03 20:34   ` [tarantool-patches] " Konstantin Osipov
  8 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-28 16:14 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

Closes #1082

@TarantoolBot document
Title: Document box.cfg.checkpoint_wal_threshold

Tarantool makes checkpoints every box.cfg.checkpoint_interval seconds
and keeps last box.cfg.checkpoint_count checkpoints. It also keeps all
intermediate WAL files. Currently, it isn't possible to set a checkpoint
trigger based on the sum size of WAL files, which makes it difficult to
estimate the minimal amount of disk space that needs to be allotted to a
Tarantool instance for storing WALs to eliminate the possibility of
ENOSPC errors. For example, under normal conditions a Tarantool instance
may write 1 GB of WAL files every box.cfg.checkpoint_interval seconds
and so one may assume that 1 GB times box.cfg.checkpoint_count should be
enough for the WAL partition, but there's no guarantee it won't write 10
GB between checkpoints when the load is extreme.

So we've agreed that we must provide users with one more configuration
option that could be used to impose the limit on the sum size of WAL
files. The new option is called box.cfg.checkpoint_wal_threshold. Once
the configured threshold is exceeded, the WAL thread notifies the
checkpoint daemon that it's time to make a new checkpoint and delete
old WAL files. Note, the new option only limits the size of WAL files
created since the last checkpoint, because backup WAL files are not
needed for recovery and can be deleted in case of emergency ENOSPC, for
more details see tarantool/tarantool#1082, tarantool/tarantool#3397,
tarantool/tarantool#3822.
---
 src/box/box.cc                          |  21 +++++-
 src/box/box.h                           |   1 +
 src/box/lua/cfg.cc                      |  12 ++++
 src/box/lua/load_cfg.lua                |   3 +
 src/box/wal.c                           | 106 ++++++++++++++++++++++++++++--
 src/box/wal.h                           |  25 ++++++-
 test/app-tap/init_script.result         |  87 +++++++++++++------------
 test/box/admin.result                   |   2 +
 test/box/cfg.result                     |   4 ++
 test/xlog/checkpoint_threshold.result   | 112 ++++++++++++++++++++++++++++++++
 test/xlog/checkpoint_threshold.test.lua |  62 ++++++++++++++++++
 test/xlog/suite.ini                     |   2 +-
 12 files changed, 385 insertions(+), 52 deletions(-)
 create mode 100644 test/xlog/checkpoint_threshold.result
 create mode 100644 test/xlog/checkpoint_threshold.test.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index 24ddd941..1af08ea3 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -938,6 +938,13 @@ box_set_checkpoint_interval(void)
 }
 
 void
+box_set_checkpoint_wal_threshold(void)
+{
+	int64_t checkpoint_threshold = cfg_geti64("checkpoint_wal_threshold");
+	wal_set_checkpoint_threshold(checkpoint_threshold);
+}
+
+void
 box_set_checkpoint_count(void)
 {
 	int checkpoint_count = cfg_geti("checkpoint_count");
@@ -2111,6 +2118,17 @@ on_wal_garbage_collection(const struct vclock *vclock)
 	gc_advance(vclock);
 }
 
+static void
+on_wal_checkpoint_threshold(void)
+{
+	if (box_checkpoint_is_in_progress)
+		return;
+
+	say_info("WAL threshold exceeded, triggering checkpoint");
+	next_checkpoint_time = ev_monotonic_now(loop());
+	fiber_wakeup(checkpoint_daemon);
+}
+
 void
 box_init(void)
 {
@@ -2225,7 +2243,8 @@ box_cfg_xc(void)
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
 		     wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
-		     &checkpoint->vclock, on_wal_garbage_collection) != 0) {
+		     &checkpoint->vclock, on_wal_garbage_collection,
+		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
 	}
 
diff --git a/src/box/box.h b/src/box/box.h
index 9bde583d..eb037d07 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -200,6 +200,7 @@ void box_set_too_long_threshold(void);
 void box_set_readahead(void);
 void box_set_checkpoint_count(void);
 void box_set_checkpoint_interval(void);
+void box_set_checkpoint_wal_threshold(void);
 void box_set_memtx_memory(void);
 void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_memory(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 4f08c78e..4884ce01 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -176,6 +176,17 @@ lbox_cfg_set_checkpoint_interval(struct lua_State *L)
 }
 
 static int
+lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
+{
+	try {
+		box_set_checkpoint_wal_threshold();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
+static int
 lbox_cfg_set_read_only(struct lua_State *L)
 {
 	try {
@@ -352,6 +363,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit},
 		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
 		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
+		{"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
 		{"cfg_set_read_only", lbox_cfg_set_read_only},
 		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
 		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index a9e6fe07..4f887b1a 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -68,6 +68,7 @@ local default_cfg = {
     read_only           = false,
     hot_standby         = false,
     checkpoint_interval = 3600,
+    checkpoint_wal_threshold = 0,
     checkpoint_count    = 2,
     worker_pool_threads = 4,
     replication_timeout = 1,
@@ -128,6 +129,7 @@ local template_cfg = {
     username            = 'string',
     coredump            = 'boolean',
     checkpoint_interval = 'number',
+    checkpoint_wal_threshold = 'number',
     checkpoint_count    = 'number',
     read_only           = 'boolean',
     hot_standby         = 'boolean',
@@ -228,6 +230,7 @@ local dynamic_cfg = {
     vinyl_timeout           = private.cfg_set_vinyl_timeout,
     checkpoint_count        = private.cfg_set_checkpoint_count,
     checkpoint_interval     = private.cfg_set_checkpoint_interval,
+    checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
     worker_pool_threads     = private.cfg_set_worker_pool_threads,
     feedback_enabled        = private.feedback_daemon.set_feedback_params,
     feedback_host           = private.feedback_daemon.set_feedback_params,
diff --git a/src/box/wal.c b/src/box/wal.c
index bbb8376c..1933c6b4 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -92,6 +92,7 @@ struct wal_writer
 	struct journal base;
 	/* ----------------- tx ------------------- */
 	wal_on_garbage_collection_f on_garbage_collection;
+	wal_on_checkpoint_threshold_f on_checkpoint_threshold;
 	/**
 	 * The rollback queue. An accumulator for all requests
 	 * that need to be rolled back. Also acts as a valve
@@ -126,6 +127,25 @@ struct wal_writer
 	 * recover from it even if it is running out of disk space.
 	 */
 	struct vclock checkpoint_vclock;
+	/**
+	 * Total size of WAL files written since the last
+	 * checkpoint.
+	 */
+	int64_t checkpoint_wal_size;
+	/**
+	 * If greater than 0, this variable sets a limit on the
+	 * total size of WAL files written since the last checkpoint.
+	 * Exceeding it will trigger auto checkpointing in tx.
+	 */
+	int64_t checkpoint_threshold;
+	/**
+	 * Set if the WAL thread has signalled the TX thread that
+	 * the checkpoint threshold has been exceeded, cleared
+	 * on checkpoint completion. Needed so as not to invoke
+	 * the tx callback over and over again while checkpointing
+	 * is in progress.
+	 */
+	bool checkpoint_threshold_signalled;
 	/** The current WAL file. */
 	struct xlog current_wal;
 	/**
@@ -154,6 +174,11 @@ struct wal_msg {
 	 */
 	struct stailq rollback;
 	/**
+	 * Set if the checkpoint threshold was exceeded while
+	 * processing this request.
+	 */
+	bool checkpoint_threshold_exceeded;
+	/**
 	 * Set if the WAL thread ran out of disk space while
 	 * processing this request and had to delete some old
 	 * WAL files.
@@ -201,6 +226,7 @@ wal_msg_create(struct wal_msg *batch)
 	cmsg_init(&batch->base, wal_request_route);
 	batch->approx_len = 0;
 	batch->gc_executed = false;
+	batch->checkpoint_threshold_exceeded = false;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
 }
@@ -282,6 +308,8 @@ tx_schedule_commit(struct cmsg *msg)
 
 	if (batch->gc_executed)
 		writer->on_garbage_collection(&batch->gc_vclock);
+	if (batch->checkpoint_threshold_exceeded)
+		writer->on_checkpoint_threshold();
 }
 
 static void
@@ -313,7 +341,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 		  const struct vclock *vclock,
 		  const struct vclock *checkpoint_vclock,
-		  wal_on_garbage_collection_f on_garbage_collection)
+		  wal_on_garbage_collection_f on_garbage_collection,
+		  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_rows = wal_max_rows;
@@ -329,11 +358,16 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	stailq_create(&writer->rollback);
 	cmsg_init(&writer->in_rollback, NULL);
 
+	writer->checkpoint_wal_size = 0;
+	writer->checkpoint_threshold = 0;
+	writer->checkpoint_threshold_signalled = false;
+
 	vclock_copy(&writer->vclock, vclock);
 	vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
 	rlist_create(&writer->watchers);
 
 	writer->on_garbage_collection = on_garbage_collection;
+	writer->on_checkpoint_threshold = on_checkpoint_threshold;
 }
 
 /** Destroy a WAL writer structure. */
@@ -439,14 +473,16 @@ int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
-	 wal_on_garbage_collection_f on_garbage_collection)
+	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	assert(wal_max_rows > 1);
 
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
 			  wal_max_size, instance_uuid, vclock,
-			  checkpoint_vclock, on_garbage_collection);
+			  checkpoint_vclock, on_garbage_collection,
+			  on_checkpoint_threshold);
 
 	/*
 	 * Scan the WAL directory to build an index of all
@@ -517,6 +553,7 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data)
 		 */
 	}
 	vclock_copy(&msg->vclock, &writer->vclock);
+	msg->wal_size = writer->checkpoint_wal_size;
 	return 0;
 }
 
@@ -526,6 +563,7 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->wal_mode == WAL_NONE) {
 		vclock_copy(&checkpoint->vclock, &writer->vclock);
+		checkpoint->wal_size = 0;
 		return 0;
 	}
 	if (!stailq_empty(&writer->rollback)) {
@@ -554,7 +592,20 @@ wal_commit_checkpoint_f(struct cbus_call_msg *data)
 {
 	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
 	struct wal_writer *writer = &wal_writer_singleton;
+	/*
+	 * Now, once checkpoint has been created, we can update
+	 * the WAL's version of the last checkpoint vclock and
+	 * reset the size of WAL files written since the last
+	 * checkpoint. Note, since new WAL records may have been
+	 * written while the checkpoint was created, we subtract
+	 * the value of checkpoint_wal_size observed at the time
+	 * when checkpointing started from the current value
+	 * rather than just setting it to 0.
+	 */
 	vclock_copy(&writer->checkpoint_vclock, &msg->vclock);
+	assert(writer->checkpoint_wal_size >= msg->wal_size);
+	writer->checkpoint_wal_size -= msg->wal_size;
+	writer->checkpoint_threshold_signalled = false;
 	return 0;
 }
 
@@ -573,6 +624,37 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
 	fiber_set_cancellable(cancellable);
 }
 
+struct wal_set_checkpoint_threshold_msg
+{
+	struct cbus_call_msg base;
+	int64_t checkpoint_threshold;
+};
+
+static int
+wal_set_checkpoint_threshold_f(struct cbus_call_msg *data)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_set_checkpoint_threshold_msg *msg;
+	msg = (struct wal_set_checkpoint_threshold_msg *)data;
+	writer->checkpoint_threshold = msg->checkpoint_threshold;
+	return 0;
+}
+
+void
+wal_set_checkpoint_threshold(int64_t checkpoint_threshold)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	if (writer->wal_mode == WAL_NONE)
+		return;
+	struct wal_set_checkpoint_threshold_msg msg;
+	msg.checkpoint_threshold = checkpoint_threshold;
+	bool cancellable = fiber_set_cancellable(false);
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+		  &msg.base, wal_set_checkpoint_threshold_f, NULL,
+		  TIMEOUT_INFINITY);
+	fiber_set_cancellable(cancellable);
+}
+
 struct wal_gc_msg
 {
 	struct cbus_call_msg base;
@@ -859,23 +941,35 @@ wal_write_to_disk(struct cmsg *msg)
 	/*
 	 * Iterate over requests (transactions)
 	 */
+	int rc;
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
 		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(&writer->vclock);
-		int rc = xlog_write_entry(l, entry);
+		rc = xlog_write_entry(l, entry);
 		if (rc < 0)
 			goto done;
-		if (rc > 0)
+		if (rc > 0) {
+			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
+		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
-	if (xlog_flush(l) < 0)
+	rc = xlog_flush(l);
+	if (rc < 0)
 		goto done;
 
+	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 
+	if (writer->checkpoint_threshold > 0 &&
+	    !writer->checkpoint_threshold_signalled &&
+	    writer->checkpoint_wal_size > writer->checkpoint_threshold) {
+		writer->checkpoint_threshold_signalled = true;
+		wal_msg->checkpoint_threshold_exceeded = true;
+	}
+
 done:
 	error = diag_last_error(diag_get());
 	if (error) {
diff --git a/src/box/wal.h b/src/box/wal.h
index 3f0fae07..1ba8bf8b 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -61,6 +61,13 @@ extern "C" {
  */
 typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
 
+/**
+ * Callback invoked on behalf of the tx thread upon request
+ * completion when the total size of WAL files written since
+ * the last checkpoint exceeds the configured threshold.
+ */
+typedef void (*wal_on_checkpoint_threshold_f)(void);
+
 void
 wal_thread_start();
 
@@ -68,7 +75,8 @@ int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
-	 wal_on_garbage_collection_f on_garbage_collection);
+	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
 void
 wal_thread_stop();
@@ -168,6 +176,12 @@ struct wal_checkpoint {
 	 * identify the new checkpoint.
 	 */
 	struct vclock vclock;
+	/**
+	 * Size of WAL files written since the last checkpoint.
+	 * Used to reset the corresponding WAL counter upon
+	 * successful checkpoint creation.
+	 */
+	int64_t wal_size;
 };
 
 /**
@@ -190,6 +204,15 @@ void
 wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 
 /**
+ * Limit the size of WAL files written since the last checkpoint.
+ * Exceeding the limit will trigger automatic checkpointing in tx.
+ * If @checkpoint_threshold is less than or equal to 0, automatic
+ * checkpointing will be disabled.
+ */
+void
+wal_set_checkpoint_threshold(int64_t checkpoint_threshold);
+
+/**
  * Remove WAL files that are not needed by consumers reading
  * rows at @vclock or newer.
  */
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index b03e5159..fd58ae94 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -6,49 +6,50 @@ box.cfg
 1	background:false
 2	checkpoint_count:2
 3	checkpoint_interval:3600
-4	coredump:false
-5	feedback_enabled:true
-6	feedback_host:https://feedback.tarantool.io
-7	feedback_interval:3600
-8	force_recovery:false
-9	hot_standby:false
-10	listen:port
-11	log:tarantool.log
-12	log_format:plain
-13	log_level:5
-14	memtx_dir:.
-15	memtx_max_tuple_size:1048576
-16	memtx_memory:107374182
-17	memtx_min_tuple_size:16
-18	net_msg_max:768
-19	pid_file:box.pid
-20	read_only:false
-21	readahead:16320
-22	replication_connect_timeout:30
-23	replication_skip_conflict:false
-24	replication_sync_lag:10
-25	replication_sync_timeout:300
-26	replication_timeout:1
-27	rows_per_wal:500000
-28	slab_alloc_factor:1.05
-29	too_long_threshold:0.5
-30	vinyl_bloom_fpr:0.05
-31	vinyl_cache:134217728
-32	vinyl_dir:.
-33	vinyl_max_tuple_size:1048576
-34	vinyl_memory:134217728
-35	vinyl_page_size:8192
-36	vinyl_range_size:1073741824
-37	vinyl_read_threads:1
-38	vinyl_run_count_per_level:2
-39	vinyl_run_size_ratio:3.5
-40	vinyl_timeout:60
-41	vinyl_write_threads:4
-42	wal_dir:.
-43	wal_dir_rescan_delay:2
-44	wal_max_size:268435456
-45	wal_mode:write
-46	worker_pool_threads:4
+4	checkpoint_wal_threshold:0
+5	coredump:false
+6	feedback_enabled:true
+7	feedback_host:https://feedback.tarantool.io
+8	feedback_interval:3600
+9	force_recovery:false
+10	hot_standby:false
+11	listen:port
+12	log:tarantool.log
+13	log_format:plain
+14	log_level:5
+15	memtx_dir:.
+16	memtx_max_tuple_size:1048576
+17	memtx_memory:107374182
+18	memtx_min_tuple_size:16
+19	net_msg_max:768
+20	pid_file:box.pid
+21	read_only:false
+22	readahead:16320
+23	replication_connect_timeout:30
+24	replication_skip_conflict:false
+25	replication_sync_lag:10
+26	replication_sync_timeout:300
+27	replication_timeout:1
+28	rows_per_wal:500000
+29	slab_alloc_factor:1.05
+30	too_long_threshold:0.5
+31	vinyl_bloom_fpr:0.05
+32	vinyl_cache:134217728
+33	vinyl_dir:.
+34	vinyl_max_tuple_size:1048576
+35	vinyl_memory:134217728
+36	vinyl_page_size:8192
+37	vinyl_range_size:1073741824
+38	vinyl_read_threads:1
+39	vinyl_run_count_per_level:2
+40	vinyl_run_size_ratio:3.5
+41	vinyl_timeout:60
+42	vinyl_write_threads:4
+43	wal_dir:.
+44	wal_dir_rescan_delay:2
+45	wal_max_size:268435456
+46	wal_mode:write
+47	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 6da53f30..80e220cc 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -32,6 +32,8 @@ cfg_filter(box.cfg)
     - 2
   - - checkpoint_interval
     - 3600
+  - - checkpoint_wal_threshold
+    - 0
   - - coredump
     - false
   - - feedback_enabled
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 01e6bc6b..248a7e8c 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -20,6 +20,8 @@ cfg_filter(box.cfg)
     - 2
   - - checkpoint_interval
     - 3600
+  - - checkpoint_wal_threshold
+    - 0
   - - coredump
     - false
   - - feedback_enabled
@@ -119,6 +121,8 @@ cfg_filter(box.cfg)
     - 2
   - - checkpoint_interval
     - 3600
+  - - checkpoint_wal_threshold
+    - 0
   - - coredump
     - false
   - - feedback_enabled
diff --git a/test/xlog/checkpoint_threshold.result b/test/xlog/checkpoint_threshold.result
new file mode 100644
index 00000000..bad5e2be
--- /dev/null
+++ b/test/xlog/checkpoint_threshold.result
@@ -0,0 +1,112 @@
+test_run = require('test_run').new()
+---
+...
+fiber = require('fiber')
+---
+...
+digest = require('digest')
+---
+...
+threshold = 10 * 1024
+---
+...
+box.cfg{checkpoint_wal_threshold = threshold}
+---
+...
+s = box.schema.space.create('test')
+---
+...
+_ = s:create_index('pk')
+---
+...
+box.snapshot()
+---
+- ok
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function put(size)
+    s:auto_increment{digest.urandom(size)}
+end;
+---
+...
+function wait_checkpoint(signature)
+    signature = signature or box.info.signature
+    return test_run:wait_cond(function()
+        local checkpoints = box.info.gc().checkpoints
+        return signature == checkpoints[#checkpoints].signature
+    end, 10)
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+--
+-- Check that checkpointing is triggered automatically once
+-- the size of WAL files written since the last checkpoint
+-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082).
+--
+for i = 1, 3 do put(threshold / 3) end
+---
+...
+wait_checkpoint()
+---
+- true
+...
+for i = 1, 5 do put(threshold / 5) end
+---
+...
+wait_checkpoint()
+---
+- true
+...
+--
+-- Check that WAL rows written while a checkpoint was created
+-- are accounted as written after the checkpoint.
+--
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true)
+---
+- ok
+...
+-- This should trigger checkpointing, which will take quite
+-- a while due to the injected delay.
+for i = 1, 5 do put(threshold / 5) end
+---
+...
+fiber.sleep(0)
+---
+...
+-- Remember the future checkpoint signature.
+signature = box.info.signature
+---
+...
+-- Insert some records while the checkpoint is created.
+for i = 1, 4 do put(threshold / 5) end
+---
+...
+-- Disable the delay and wait for checkpointing to complete.
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false)
+---
+- ok
+...
+wait_checkpoint(signature)
+---
+- true
+...
+-- Check that insertion of one more record triggers another
+-- checkpoint, because it sums up with records written while
+-- the previous checkpoint was created.
+put(threshold / 5)
+---
+...
+wait_checkpoint()
+---
+- true
+...
+box.cfg{checkpoint_wal_threshold = 0}
+---
+...
diff --git a/test/xlog/checkpoint_threshold.test.lua b/test/xlog/checkpoint_threshold.test.lua
new file mode 100644
index 00000000..7057d835
--- /dev/null
+++ b/test/xlog/checkpoint_threshold.test.lua
@@ -0,0 +1,62 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+digest = require('digest')
+
+threshold = 10 * 1024
+box.cfg{checkpoint_wal_threshold = threshold}
+
+s = box.schema.space.create('test')
+_ = s:create_index('pk')
+box.snapshot()
+
+test_run:cmd("setopt delimiter ';'")
+function put(size)
+    s:auto_increment{digest.urandom(size)}
+end;
+function wait_checkpoint(signature)
+    signature = signature or box.info.signature
+    return test_run:wait_cond(function()
+        local checkpoints = box.info.gc().checkpoints
+        return signature == checkpoints[#checkpoints].signature
+    end, 10)
+end;
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Check that checkpointing is triggered automatically once
+-- the size of WAL files written since the last checkpoint
+-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082).
+--
+for i = 1, 3 do put(threshold / 3) end
+wait_checkpoint()
+for i = 1, 5 do put(threshold / 5) end
+wait_checkpoint()
+
+--
+-- Check that WAL rows written while a checkpoint was created
+-- are accounted as written after the checkpoint.
+--
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true)
+
+-- This should trigger checkpointing, which will take quite
+-- a while due to the injected delay.
+for i = 1, 5 do put(threshold / 5) end
+fiber.sleep(0)
+
+-- Remember the future checkpoint signature.
+signature = box.info.signature
+
+-- Insert some records while the checkpoint is created.
+for i = 1, 4 do put(threshold / 5) end
+
+-- Disable the delay and wait for checkpointing to complete.
+box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false)
+wait_checkpoint(signature)
+
+-- Check that insertion of one more record triggers another
+-- checkpoint, because it sums up with records written while
+-- the previous checkpoint was created.
+put(threshold / 5)
+wait_checkpoint()
+
+box.cfg{checkpoint_wal_threshold = 0}
diff --git a/test/xlog/suite.ini b/test/xlog/suite.ini
index 4f82295d..4043f370 100644
--- a/test/xlog/suite.ini
+++ b/test/xlog/suite.ini
@@ -4,7 +4,7 @@ description = tarantool write ahead log tests
 script = xlog.lua
 disabled = snap_io_rate.test.lua upgrade.test.lua
 valgrind_disabled =
-release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua
+release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua
 config = suite.cfg
 use_unix_sockets = True
 long_run = snap_io_rate.test.lua
-- 
2.11.0

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 1/9] wal: separate checkpoint and flush paths
  2018-11-28 16:14 ` [PATCH 1/9] wal: separate checkpoint and flush paths Vladimir Davydov
@ 2018-11-29 16:24   ` Konstantin Osipov
  0 siblings, 0 replies; 28+ messages in thread
From: Konstantin Osipov @ 2018-11-29 16:24 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> Currently, wal_checkpoint() is used for two purposes. First, to make a
> checkpoint (rotate = true). Second, to flush all pending WAL requests
> (rotate = false). Since checkpointing has to fail if cascading rollback
> is in progress so does flushing. This is confusing. Let's separate the
> two paths.

I renamed wal_flush() to wal_sync() and pushed the patch.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 2/9] wal: remove files needed for recovery from backup checkpoints on ENOSPC
  2018-11-28 16:14 ` [PATCH 2/9] wal: remove files needed for recovery from backup checkpoints on ENOSPC Vladimir Davydov
@ 2018-11-29 16:31   ` Konstantin Osipov
  2018-11-29 17:42     ` Vladimir Davydov
  0 siblings, 1 reply; 28+ messages in thread
From: Konstantin Osipov @ 2018-11-29 16:31 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> Tarantool always keeps box.cfg.checkpoint_count latest checkpoints. It
> also never deletes WAL files needed for recovery from any of them for
> the sake of redundancy, even if it gets ENOSPC while trying to write to
> WAL. This patch changes that behavior: now the WAL thread is allowed to
> delete backup WAL files in case of emergency ENOSPC - after all it's
> better than stopping operation.

If you wish WAL to be part of some of the engine APIs, such as
checkpointing, you could make it an engine and register in the
engine list. MySQL does it this way.

The patch is OK to push.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 3/9] recovery: restore garbage collector vclock after restart
  2018-11-28 16:14 ` [PATCH 3/9] recovery: restore garbage collector vclock after restart Vladimir Davydov
@ 2018-11-29 16:37   ` Konstantin Osipov
  2018-11-29 17:42     ` Vladimir Davydov
  0 siblings, 1 reply; 28+ messages in thread
From: Konstantin Osipov @ 2018-11-29 16:37 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> After restart the garbage collector vclock is reset to the vclock of the
> oldest preserved checkpoint, which is incorrect - it may be less in case
> there is a replica that lagged behind, and it may be greater as well in
> case the WAL thread hit ENOSPC and had to remove some WAL files to
> continue. Fix it.

OK to push.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 4/9] gc: run garbage collection in background
  2018-11-28 16:14 ` [PATCH 4/9] gc: run garbage collection in background Vladimir Davydov
@ 2018-11-29 16:42   ` Konstantin Osipov
  2018-11-29 17:43     ` Vladimir Davydov
  0 siblings, 1 reply; 28+ messages in thread
From: Konstantin Osipov @ 2018-11-29 16:42 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> Currently, garbage collection is executed synchronously by functions
> that may trigger it, such as gc_consumer_advance or gc_add_checkpoint.
> As a result, one has to be very cautious when using those functions as
> they may yield at their will. For example, we can't shoot off stale
> consumers right in tx_prio handler - we have to use rather clumsy WAL
> watcher interface instead. Besides, in future, when the garbage
> collector state is persisted, we will need to call those functions from
> on_commit trigger callback, where yielding is not normally allowed.
> 
> Actually, there's no reason to remove old files synchronously - we could
> as well do it in the background. So this patch introduces a background
> garbage collection fiber that executes gc_run when woken up. Now all
> functions that might trigger garbage collection wake up this fiber
> instead of executing gc_run directly.

OK to push.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 5/9] gc: do not use WAL watcher API for deactivating stale consumers
  2018-11-28 16:14 ` [PATCH 5/9] gc: do not use WAL watcher API for deactivating stale consumers Vladimir Davydov
@ 2018-11-29 17:02   ` Konstantin Osipov
  0 siblings, 0 replies; 28+ messages in thread
From: Konstantin Osipov @ 2018-11-29 17:02 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> The WAL thread may delete old WAL files if it gets ENOSPC error.
> Currently, we use WAL watcher API to notify the TX thread about it so
> that it can shoot off stale replicas. This looks ugly, because WAL
> watcher API was initially designed to propagate WAL changes to relay
> threads and the new event WAL_EVENT_GC, which was introduced for
> notifying about ENOSPC-driven garbage collection, isn't used anywhere
> else. Besides, there's already a pipe from WAL to TX - we could reuse it
> instead of opening another one.
> 
> If we followed down that path, then in order to trigger a checkpoint
> from the WAL thread (see #1082), we would have to introduce yet another
> esoteric WAL watcher event, making the whole design look even uglier.
> That said, let's rewrite the garbage collection notification procedure
> using a plane callback instead of abusing WAL watcher API.

Thank you for the patch. As discussed, let's avoid invoking
garbage collection from txn.cc, WAL can alert tx explicitly
whenever an exceptional situation such as ENOSPC happens.


> +	/**
> +	 * Set if the WAL thread ran out of disk space while
> +	 * processing this request and had to delete some old
> +	 * WAL files.
> +	 */
> +	bool gc_executed;

This will be unnecessary if we send a special message.

If you are worried about OOM when creating such a message, you
could
a) ignore the message
b) use a preallocated one, stored in struct wal.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 6/9] wal: simplify watcher API
  2018-11-28 16:14 ` [PATCH 6/9] wal: simplify watcher API Vladimir Davydov
@ 2018-11-29 17:33   ` Konstantin Osipov
  0 siblings, 0 replies; 28+ messages in thread
From: Konstantin Osipov @ 2018-11-29 17:33 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> This patch reverts changes done in order to make WAL watcher API
> suitable for notiying TX about WAL garbage collection triggered on
> ENOSPC, namely:

OK to push.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 2/9] wal: remove files needed for recovery from backup checkpoints on ENOSPC
  2018-11-29 16:31   ` [tarantool-patches] " Konstantin Osipov
@ 2018-11-29 17:42     ` Vladimir Davydov
  0 siblings, 0 replies; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-29 17:42 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

On Thu, Nov 29, 2018 at 07:31:03PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> > Tarantool always keeps box.cfg.checkpoint_count latest checkpoints. It
> > also never deletes WAL files needed for recovery from any of them for
> > the sake of redundancy, even if it gets ENOSPC while trying to write to
> > WAL. This patch changes that behavior: now the WAL thread is allowed to
> > delete backup WAL files in case of emergency ENOSPC - after all it's
> > better than stopping operation.
> 
> If you wish WAL to be part of some of the engine APIs, such as
> checkpointing, you could make it an engine and register in the
> engine list. MySQL does it this way.
> 
> The patch is OK to push.

Pushed to 2.1.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 3/9] recovery: restore garbage collector vclock after restart
  2018-11-29 16:37   ` [tarantool-patches] " Konstantin Osipov
@ 2018-11-29 17:42     ` Vladimir Davydov
  0 siblings, 0 replies; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-29 17:42 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

On Thu, Nov 29, 2018 at 07:37:15PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> > After restart the garbage collector vclock is reset to the vclock of the
> > oldest preserved checkpoint, which is incorrect - it may be less in case
> > there is a replica that lagged behind, and it may be greater as well in
> > case the WAL thread hit ENOSPC and had to remove some WAL files to
> > continue. Fix it.
> 
> OK to push.

Pushed to 2.1.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 4/9] gc: run garbage collection in background
  2018-11-29 16:42   ` [tarantool-patches] " Konstantin Osipov
@ 2018-11-29 17:43     ` Vladimir Davydov
  0 siblings, 0 replies; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-29 17:43 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

On Thu, Nov 29, 2018 at 07:42:06PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> > Currently, garbage collection is executed synchronously by functions
> > that may trigger it, such as gc_consumer_advance or gc_add_checkpoint.
> > As a result, one has to be very cautious when using those functions as
> > they may yield at their will. For example, we can't shoot off stale
> > consumers right in tx_prio handler - we have to use rather clumsy WAL
> > watcher interface instead. Besides, in future, when the garbage
> > collector state is persisted, we will need to call those functions from
> > on_commit trigger callback, where yielding is not normally allowed.
> > 
> > Actually, there's no reason to remove old files synchronously - we could
> > as well do it in the background. So this patch introduces a background
> > garbage collection fiber that executes gc_run when woken up. Now all
> > functions that might trigger garbage collection wake up this fiber
> > instead of executing gc_run directly.
> 
> OK to push.

Pushed to 2.1.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 7/9] box: rewrite checkpoint daemon in C
  2018-11-28 16:14 ` [PATCH 7/9] box: rewrite checkpoint daemon in C Vladimir Davydov
@ 2018-11-30  8:58   ` Konstantin Osipov
  2018-11-30  9:41     ` Vladimir Davydov
  0 siblings, 1 reply; 28+ messages in thread
From: Konstantin Osipov @ 2018-11-30  8:58 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> Long time ago, when the checkpoint daemon was added to Tarantool, it was
> responsible not only for making periodic checkpoints, but also for
> maintaining the configured number of checkpoints and removing old snap
> and xlog times, so it was much easier to implement it in Lua than in C.
files 
> However, over time, all its responsibilities have been reimplemented in
> C and moved to the server code so that now it just calls box.snapshot()
> periodically. Let's rewrite this simple procedure in C as well - this
> will allow us to easily add more complex logic there, e.g. triggering
> checkpoint when WAL files exceed a configured threshold.
> ---
>  src/box/CMakeLists.txt               |   1 -
>  src/box/box.cc                       | 102 ++++++++++++++++++++++++
>  src/box/box.h                        |   1 +
>  src/box/lua/cfg.cc                   |  12 +++
>  src/box/lua/checkpoint_daemon.lua    | 136 --------------------------------
>  src/box/lua/init.c                   |   2 -
>  src/box/lua/load_cfg.lua             |   2 +-
>  test/xlog/checkpoint_daemon.result   | 145 -----------------------------------

Could we please move this fiber to gc.c at least? Let's not
pollute box?

> +		if (box_checkpoint_is_in_progress) {
> +			/*
> +			 * The next checkpoint will be scheduled
> +			 * by the concurrent box_checkpoint().
> +			 */

This is rather fragile. Can we make the interaction between
box-checkpoint and checkpoint daemon less obvious? Imagine in
future we can perform checkpoints according to a cron. 

I think it would be better if the logic of next checkpoint time
calculation is consolidated in the checkpoint daemon alone. If the
daemon sees that a checkpoint is in progress it can skip the
current checkpoint, but not delay the next checkpoint till
infinity.

> +			next_checkpoint_time = now + TIMEOUT_INFINITY;
> +			continue;
> +		}
> +		box_checkpoint();
> +	}
> +	checkpoint_daemon = NULL;
> +	return 0;
> +}
> +
> --- a/test/xlog/checkpoint_daemon.test.lua
> +++ b/test/xlog/checkpoint_daemon.test.lua

Please keep the test. I don't understand what's wrong with it,
neither it is obvious from changeset comments.

 

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 8/9] wal: pass struct instead of vclock to checkpoint methods
  2018-11-28 16:14 ` [PATCH 8/9] wal: pass struct instead of vclock to checkpoint methods Vladimir Davydov
@ 2018-11-30  9:00   ` Konstantin Osipov
  2018-11-30  9:43     ` Vladimir Davydov
  0 siblings, 1 reply; 28+ messages in thread
From: Konstantin Osipov @ 2018-11-30  9:00 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> Currently, we only need to pass a vclock between TX and WAL during
> checkpointing. However, in order to implement auto-checkpointing
> triggered when WAL size exceeds a certain threshold, we will need to
> pass some extra info so that we can properly reset the counter
> accounting the WAL size in the WAL thread. To make it possible,let's
> move wal_checkpoint struct, which is used internally by WAL to pass a
> checkpoint vclock, to the header and require the caller to pass it to
> wal_begin/commit_checkpoint instead of just a vclock.

wal is a singleton and there could be only one ongoing checkpoint. 

I am ok with this change but I don't understand the purpose of it:
why not simply store wal_checkpoint in struct wal then you don't
have to pass it around at all?


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 7/9] box: rewrite checkpoint daemon in C
  2018-11-30  8:58   ` [tarantool-patches] " Konstantin Osipov
@ 2018-11-30  9:41     ` Vladimir Davydov
  2018-12-05 16:21       ` Vladimir Davydov
  0 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-30  9:41 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

On Fri, Nov 30, 2018 at 11:58:10AM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> > Long time ago, when the checkpoint daemon was added to Tarantool, it was
> > responsible not only for making periodic checkpoints, but also for
> > maintaining the configured number of checkpoints and removing old snap
> > and xlog times, so it was much easier to implement it in Lua than in C.
> files 
> > However, over time, all its responsibilities have been reimplemented in
> > C and moved to the server code so that now it just calls box.snapshot()
> > periodically. Let's rewrite this simple procedure in C as well - this
> > will allow us to easily add more complex logic there, e.g. triggering
> > checkpoint when WAL files exceed a configured threshold.
> > ---
> >  src/box/CMakeLists.txt               |   1 -
> >  src/box/box.cc                       | 102 ++++++++++++++++++++++++
> >  src/box/box.h                        |   1 +
> >  src/box/lua/cfg.cc                   |  12 +++
> >  src/box/lua/checkpoint_daemon.lua    | 136 --------------------------------
> >  src/box/lua/init.c                   |   2 -
> >  src/box/lua/load_cfg.lua             |   2 +-
> >  test/xlog/checkpoint_daemon.result   | 145 -----------------------------------
> 
> Could we please move this fiber to gc.c at least? Let's not
> pollute box?

If we moved it to gc.c, there would be a cross-dependency between gc.c
and box.cc - the daemon needs box_checkpoint defined in box.cc while
box.cc needs gc methods to register consumers. One way to solve this
problem is move box_checkpoint to gc.c, however, it would look weird
IMO as checkpointing isn't really a part of garbage collection. I guess
we have to introduce a separate file for the checkpoint daemon and
box_checkpoint after all.

> 
> > +		if (box_checkpoint_is_in_progress) {
> > +			/*
> > +			 * The next checkpoint will be scheduled
> > +			 * by the concurrent box_checkpoint().
> > +			 */
> 
> This is rather fragile. Can we make the interaction between
> box-checkpoint and checkpoint daemon less obvious? Imagine in
> future we can perform checkpoints according to a cron. 
> 
> I think it would be better if the logic of next checkpoint time
> calculation is consolidated in the checkpoint daemon alone. If the
> daemon sees that a checkpoint is in progress it can skip the
> current checkpoint, but not delay the next checkpoint till
> infinity.

But then box_checkpoint triggered by exceeding checkpoint_wal_threshold
wouldn't reset the timer set by the checkpoint daemon and the time
interval between checkpoints could be less than configured at times,
which would probably raise questions. Would it be OK? Or we'd better
notify the checkpoint daemon about checkpoints done out of schedule (by
the user or on exceeding checkpoin_wal_threshold)?

> 
> > +			next_checkpoint_time = now + TIMEOUT_INFINITY;
> > +			continue;
> > +		}
> > +		box_checkpoint();
> > +	}
> > +	checkpoint_daemon = NULL;
> > +	return 0;
> > +}
> > +
> > --- a/test/xlog/checkpoint_daemon.test.lua
> > +++ b/test/xlog/checkpoint_daemon.test.lua
> 
> Please keep the test. I don't understand what's wrong with it,
> neither it is obvious from changeset comments.

The test accesses checkpoint_daemon fiber directly to check the time of
the next scheduled checkpoint. To keep the test we would have to export
the time of the next scheduled checkpoint to box.info. Do we want it?

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 8/9] wal: pass struct instead of vclock to checkpoint methods
  2018-11-30  9:00   ` [tarantool-patches] " Konstantin Osipov
@ 2018-11-30  9:43     ` Vladimir Davydov
  2018-12-03 20:20       ` Konstantin Osipov
  0 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-11-30  9:43 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

On Fri, Nov 30, 2018 at 12:00:14PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> > Currently, we only need to pass a vclock between TX and WAL during
> > checkpointing. However, in order to implement auto-checkpointing
> > triggered when WAL size exceeds a certain threshold, we will need to
> > pass some extra info so that we can properly reset the counter
> > accounting the WAL size in the WAL thread. To make it possible,let's
> > move wal_checkpoint struct, which is used internally by WAL to pass a
> > checkpoint vclock, to the header and require the caller to pass it to
> > wal_begin/commit_checkpoint instead of just a vclock.
> 
> wal is a singleton and there could be only one ongoing checkpoint. 
> 
> I am ok with this change but I don't understand the purpose of it:
> why not simply store wal_checkpoint in struct wal then you don't
> have to pass it around at all?

Yeah, we could store the vclock in the WAL thread, but IMO that would
look ugly, because the new member of the wal_writer struct would only be
used during checkpointing and would be meaningless for the rest of the
WAL operation.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 8/9] wal: pass struct instead of vclock to checkpoint methods
  2018-11-30  9:43     ` Vladimir Davydov
@ 2018-12-03 20:20       ` Konstantin Osipov
  0 siblings, 0 replies; 28+ messages in thread
From: Konstantin Osipov @ 2018-12-03 20:20 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/30 13:17]:
> On Fri, Nov 30, 2018 at 12:00:14PM +0300, Konstantin Osipov wrote:
> > * Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> > > Currently, we only need to pass a vclock between TX and WAL during
> > > checkpointing. However, in order to implement auto-checkpointing
> > > triggered when WAL size exceeds a certain threshold, we will need to
> > > pass some extra info so that we can properly reset the counter
> > > accounting the WAL size in the WAL thread. To make it possible,let's
> > > move wal_checkpoint struct, which is used internally by WAL to pass a
> > > checkpoint vclock, to the header and require the caller to pass it to
> > > wal_begin/commit_checkpoint instead of just a vclock.
> > 
> > wal is a singleton and there could be only one ongoing checkpoint. 
> > 
> > I am ok with this change but I don't understand the purpose of it:
> > why not simply store wal_checkpoint in struct wal then you don't
> > have to pass it around at all?
> 
> Yeah, we could store the vclock in the WAL thread, but IMO that would
> look ugly, because the new member of the wal_writer struct would only be
> used during checkpointing and would be meaningless for the rest of the
> WAL operation.

OK, it's bikeshed.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* [tarantool-patches] Re: [PATCH 9/9] wal: trigger checkpoint if there are too many WALs
  2018-11-28 16:14 ` [PATCH 9/9] wal: trigger checkpoint if there are too many WALs Vladimir Davydov
@ 2018-12-03 20:34   ` Konstantin Osipov
  2018-12-04 11:25     ` Vladimir Davydov
  0 siblings, 1 reply; 28+ messages in thread
From: Konstantin Osipov @ 2018-12-03 20:34 UTC (permalink / raw)
  To: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:

Please avoid using 0 for infinity: Tarantool doesn't use 0 to mean
anything special.

> Closes #1082
> 
> @TarantoolBot document
> Title: Document box.cfg.checkpoint_wal_threshold

Please document the default value of the new variable. 

Please add checks for the range of valid values of the new
variable, as well as tests for these.

> +	int64_t checkpoint_wal_size;
> +	/**
> +	 * If greater than 0

Ugh.
> + , this variable sets a limit on the
> +	 * total size of WAL files written since the last checkpoint.
> +	 * Exceeding it will trigger auto checkpointing in tx.
> +	 */
> +	int64_t checkpoint_threshold;


> +	bool checkpoint_threshold_signalled;
> +	bool checkpoint_threshold_exceeded;

If you had the checkpoint object wit hall the messages in
the wal writer signleton, then the entire checkpoint state,
including this variable, could be easily observed in a single
place. Now that I see this flag I'm more inclined to insist
on having a singleton wal_checkpoint object, inside struct
wal_writer or standalone.

> +void
> +wal_set_checkpoint_threshold(int64_t checkpoint_threshold)
> +{
> +	struct wal_writer *writer = &wal_writer_singleton;
> +	if (writer->wal_mode == WAL_NONE)
> +		return;
> +	struct wal_set_checkpoint_threshold_msg msg;
> +	msg.checkpoint_threshold = checkpoint_threshold;
> +	bool cancellable = fiber_set_cancellable(false);
> +	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
> +		  &msg.base, wal_set_checkpoint_threshold_f, NULL,
> +		  TIMEOUT_INFINITY);
> +	fiber_set_cancellable(cancellable);
> +}

Please add a comment explaining that WAL_NONE is also set when wal
is not yet initialized.

I don't see where you calculate the value of the variable upon
server start. Did I miss this hunk?

 

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 9/9] wal: trigger checkpoint if there are too many WALs
  2018-12-03 20:34   ` [tarantool-patches] " Konstantin Osipov
@ 2018-12-04 11:25     ` Vladimir Davydov
  2018-12-04 12:53       ` Konstantin Osipov
  0 siblings, 1 reply; 28+ messages in thread
From: Vladimir Davydov @ 2018-12-04 11:25 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

On Mon, Dec 03, 2018 at 11:34:17PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> 
> Please avoid using 0 for infinity: Tarantool doesn't use 0 to mean
> anything special.

As a matter of fact, we do - setting checkpoint_interval/count to 0
results in infinite checkpoint interval/count. I want to make
checkpoint_wal_threshold consistent with those configuration options.

Anyway, if 0 doesn't mean infinity, what should one set
checkpoint_wal_threshold to to disable this feature? A very large value?
What value? 100 GB, 100 TB? Would look weird in box.cfg IMO.

> 
> > Closes #1082
> > 
> > @TarantoolBot document
> > Title: Document box.cfg.checkpoint_wal_threshold
> 
> Please document the default value of the new variable. 

OK.

> 
> Please add checks for the range of valid values of the new
> variable, as well as tests for these.

We don't check checkpoint_interval - setting it to a value <= 0 means
infinite timeout. I though why bother about checkpoint_wal_threshold
then?

> 
> > +	int64_t checkpoint_wal_size;
> > +	/**
> > +	 * If greater than 0
> 
> Ugh.
> > + , this variable sets a limit on the
> > +	 * total size of WAL files written since the last checkpoint.
> > +	 * Exceeding it will trigger auto checkpointing in tx.
> > +	 */
> > +	int64_t checkpoint_threshold;
> 
> 
> > +	bool checkpoint_threshold_signalled;
> > +	bool checkpoint_threshold_exceeded;
> 
> If you had the checkpoint object wit hall the messages in
> the wal writer signleton, then the entire checkpoint state,
> including this variable, could be easily observed in a single
> place. Now that I see this flag I'm more inclined to insist
> on having a singleton wal_checkpoint object, inside struct
> wal_writer or standalone.

I'll remove checkpoint_threshold_exceeded and will use a separate
message for this kind of notifications instead of piggybacking on
a WAL request, as we agreed.

Regarding checkpoint_threshold_signalled, quite frankly, I don't think
that introducing a new checkpoint state struct and putting it in there
would make the code look any better. This flag isn't really bound with
checkpointing - it merely indicates whether we've already triggered a
checkpoint while checkpointing may or may not be in progress.

> 
> > +void
> > +wal_set_checkpoint_threshold(int64_t checkpoint_threshold)
> > +{
> > +	struct wal_writer *writer = &wal_writer_singleton;
> > +	if (writer->wal_mode == WAL_NONE)
> > +		return;
> > +	struct wal_set_checkpoint_threshold_msg msg;
> > +	msg.checkpoint_threshold = checkpoint_threshold;
> > +	bool cancellable = fiber_set_cancellable(false);
> > +	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
> > +		  &msg.base, wal_set_checkpoint_threshold_f, NULL,
> > +		  TIMEOUT_INFINITY);
> > +	fiber_set_cancellable(cancellable);
> > +}
> 
> Please add a comment explaining that WAL_NONE is also set when wal
> is not yet initialized.

OK.

> 
> I don't see where you calculate the value of the variable upon
> server start. Did I miss this hunk?

No, it is set by load_cfg.lua, just like box.cfg.checkpoint_interval.

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 9/9] wal: trigger checkpoint if there are too many WALs
  2018-12-04 11:25     ` Vladimir Davydov
@ 2018-12-04 12:53       ` Konstantin Osipov
  0 siblings, 0 replies; 28+ messages in thread
From: Konstantin Osipov @ 2018-12-04 12:53 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/04 15:38]:
> On Mon, Dec 03, 2018 at 11:34:17PM +0300, Konstantin Osipov wrote:
> > * Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> > 
> > Please avoid using 0 for infinity: Tarantool doesn't use 0 to mean
> > anything special.
> 
> As a matter of fact, we do - setting checkpoint_interval/count to 0
> results in infinite checkpoint interval/count.

Actually I could just as well say that it results in 0
checkpoints.

0 checkpoint interval for a non-zero value of checkpoint_count
  should be forbidden - it doesn't make any sense. 

> I want to make
> checkpoint_wal_threshold consistent with those configuration options.

> Anyway, if 0 doesn't mean infinity, what should one set
> checkpoint_wal_threshold to to disable this feature? A very large value?
> What value? 100 GB, 100 TB? Would look weird in box.cfg IMO.

Yes, please set it to 2^64-1


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 28+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 7/9] box: rewrite checkpoint daemon in C
  2018-11-30  9:41     ` Vladimir Davydov
@ 2018-12-05 16:21       ` Vladimir Davydov
  0 siblings, 0 replies; 28+ messages in thread
From: Vladimir Davydov @ 2018-12-05 16:21 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

Discussed verbally. For the record, see below.

On Fri, Nov 30, 2018 at 12:41:41PM +0300, Vladimir Davydov wrote:
> On Fri, Nov 30, 2018 at 11:58:10AM +0300, Konstantin Osipov wrote:
> > * Vladimir Davydov <vdavydov.dev@gmail.com> [18/11/28 19:16]:
> > > Long time ago, when the checkpoint daemon was added to Tarantool, it was
> > > responsible not only for making periodic checkpoints, but also for
> > > maintaining the configured number of checkpoints and removing old snap
> > > and xlog times, so it was much easier to implement it in Lua than in C.
> > files 
> > > However, over time, all its responsibilities have been reimplemented in
> > > C and moved to the server code so that now it just calls box.snapshot()
> > > periodically. Let's rewrite this simple procedure in C as well - this
> > > will allow us to easily add more complex logic there, e.g. triggering
> > > checkpoint when WAL files exceed a configured threshold.
> > > ---
> > >  src/box/CMakeLists.txt               |   1 -
> > >  src/box/box.cc                       | 102 ++++++++++++++++++++++++
> > >  src/box/box.h                        |   1 +
> > >  src/box/lua/cfg.cc                   |  12 +++
> > >  src/box/lua/checkpoint_daemon.lua    | 136 --------------------------------
> > >  src/box/lua/init.c                   |   2 -
> > >  src/box/lua/load_cfg.lua             |   2 +-
> > >  test/xlog/checkpoint_daemon.result   | 145 -----------------------------------
> > 
> > Could we please move this fiber to gc.c at least? Let's not
> > pollute box?
> 
> If we moved it to gc.c, there would be a cross-dependency between gc.c
> and box.cc - the daemon needs box_checkpoint defined in box.cc while
> box.cc needs gc methods to register consumers. One way to solve this
> problem is move box_checkpoint to gc.c, however, it would look weird
> IMO as checkpointing isn't really a part of garbage collection. I guess
> we have to introduce a separate file for the checkpoint daemon and
> box_checkpoint after all.

Agreed to move checkpoint daemon to gc.c. In order not to introduce
cross-reference between box.cc and gc.c, gc_make_checkpoint() will be
introduced. The new method will do the work currently done by
box_checkpoint(), namely call engine and WAL checkpoint callbacks.

> 
> > 
> > > +		if (box_checkpoint_is_in_progress) {
> > > +			/*
> > > +			 * The next checkpoint will be scheduled
> > > +			 * by the concurrent box_checkpoint().
> > > +			 */
> > 
> > This is rather fragile. Can we make the interaction between
> > box-checkpoint and checkpoint daemon less obvious? Imagine in
> > future we can perform checkpoints according to a cron. 
> > 
> > I think it would be better if the logic of next checkpoint time
> > calculation is consolidated in the checkpoint daemon alone. If the
> > daemon sees that a checkpoint is in progress it can skip the
> > current checkpoint, but not delay the next checkpoint till
> > infinity.
> 
> But then box_checkpoint triggered by exceeding checkpoint_wal_threshold
> wouldn't reset the timer set by the checkpoint daemon and the time
> interval between checkpoints could be less than configured at times,
> which would probably raise questions. Would it be OK? Or we'd better
> notify the checkpoint daemon about checkpoints done out of schedule (by
> the user or on exceeding checkpoin_wal_threshold)?

We must notify the checkpoint daemon to reschedule the next checkpoint
whenever the user makes a checkpoint with box.snapshot(). That is, there
should be gc_schedule_checkpoint() helper that will schedule the next
checkpoint. This method will be used both by the checkpoint daemon and
by box.snapshot() in order to schedule the next checkpoint according to
box.cfg.checkpoint_interval.

> 
> > 
> > > +			next_checkpoint_time = now + TIMEOUT_INFINITY;
> > > +			continue;
> > > +		}
> > > +		box_checkpoint();
> > > +	}
> > > +	checkpoint_daemon = NULL;
> > > +	return 0;
> > > +}
> > > +
> > > --- a/test/xlog/checkpoint_daemon.test.lua
> > > +++ b/test/xlog/checkpoint_daemon.test.lua
> > 
> > Please keep the test. I don't understand what's wrong with it,
> > neither it is obvious from changeset comments.
> 
> The test accesses checkpoint_daemon fiber directly to check the time of
> the next scheduled checkpoint. To keep the test we would have to export
> the time of the next scheduled checkpoint to box.info. Do we want it?

Checkpoint scheduling logic should be separated to a separate
independent module. We should write a unit test for this module.

^ permalink raw reply	[flat|nested] 28+ messages in thread

end of thread, other threads:[~2018-12-05 16:21 UTC | newest]

Thread overview: 28+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-11-28 16:14 [PATCH 0/9] Allow to limit size of WAL files Vladimir Davydov
2018-11-28 16:14 ` [PATCH 1/9] wal: separate checkpoint and flush paths Vladimir Davydov
2018-11-29 16:24   ` [tarantool-patches] " Konstantin Osipov
2018-11-28 16:14 ` [PATCH 2/9] wal: remove files needed for recovery from backup checkpoints on ENOSPC Vladimir Davydov
2018-11-29 16:31   ` [tarantool-patches] " Konstantin Osipov
2018-11-29 17:42     ` Vladimir Davydov
2018-11-28 16:14 ` [PATCH 3/9] recovery: restore garbage collector vclock after restart Vladimir Davydov
2018-11-29 16:37   ` [tarantool-patches] " Konstantin Osipov
2018-11-29 17:42     ` Vladimir Davydov
2018-11-28 16:14 ` [PATCH 4/9] gc: run garbage collection in background Vladimir Davydov
2018-11-29 16:42   ` [tarantool-patches] " Konstantin Osipov
2018-11-29 17:43     ` Vladimir Davydov
2018-11-28 16:14 ` [PATCH 5/9] gc: do not use WAL watcher API for deactivating stale consumers Vladimir Davydov
2018-11-29 17:02   ` [tarantool-patches] " Konstantin Osipov
2018-11-28 16:14 ` [PATCH 6/9] wal: simplify watcher API Vladimir Davydov
2018-11-29 17:33   ` [tarantool-patches] " Konstantin Osipov
2018-11-28 16:14 ` [PATCH 7/9] box: rewrite checkpoint daemon in C Vladimir Davydov
2018-11-30  8:58   ` [tarantool-patches] " Konstantin Osipov
2018-11-30  9:41     ` Vladimir Davydov
2018-12-05 16:21       ` Vladimir Davydov
2018-11-28 16:14 ` [PATCH 8/9] wal: pass struct instead of vclock to checkpoint methods Vladimir Davydov
2018-11-30  9:00   ` [tarantool-patches] " Konstantin Osipov
2018-11-30  9:43     ` Vladimir Davydov
2018-12-03 20:20       ` Konstantin Osipov
2018-11-28 16:14 ` [PATCH 9/9] wal: trigger checkpoint if there are too many WALs Vladimir Davydov
2018-12-03 20:34   ` [tarantool-patches] " Konstantin Osipov
2018-12-04 11:25     ` Vladimir Davydov
2018-12-04 12:53       ` Konstantin Osipov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox