[PATCH 7/9] box: rewrite checkpoint daemon in C

Vladimir Davydov vdavydov.dev at gmail.com
Wed Nov 28 19:14:45 MSK 2018


Long time ago, when the checkpoint daemon was added to Tarantool, it was
responsible not only for making periodic checkpoints, but also for
maintaining the configured number of checkpoints and removing old snap
and xlog times, so it was much easier to implement it in Lua than in C.
However, over time, all its responsibilities have been reimplemented in
C and moved to the server code so that now it just calls box.snapshot()
periodically. Let's rewrite this simple procedure in C as well - this
will allow us to easily add more complex logic there, e.g. triggering
checkpoint when WAL files exceed a configured threshold.
---
 src/box/CMakeLists.txt               |   1 -
 src/box/box.cc                       | 102 ++++++++++++++++++++++++
 src/box/box.h                        |   1 +
 src/box/lua/cfg.cc                   |  12 +++
 src/box/lua/checkpoint_daemon.lua    | 136 --------------------------------
 src/box/lua/init.c                   |   2 -
 src/box/lua/load_cfg.lua             |   2 +-
 test/xlog/checkpoint_daemon.result   | 145 -----------------------------------
 test/xlog/checkpoint_daemon.test.lua |  56 --------------
 9 files changed, 116 insertions(+), 341 deletions(-)
 delete mode 100644 src/box/lua/checkpoint_daemon.lua

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index d1276472..b2aaa1c3 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -7,7 +7,6 @@ lua_source(lua_sources lua/load_cfg.lua)
 lua_source(lua_sources lua/schema.lua)
 lua_source(lua_sources lua/tuple.lua)
 lua_source(lua_sources lua/session.lua)
-lua_source(lua_sources lua/checkpoint_daemon.lua)
 lua_source(lua_sources lua/feedback_daemon.lua)
 lua_source(lua_sources lua/net_box.lua)
 lua_source(lua_sources lua/upgrade.lua)
diff --git a/src/box/box.cc b/src/box/box.cc
index 20412af4..7cb96cd6 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -91,6 +91,22 @@ bool box_checkpoint_is_in_progress = false;
 const struct vclock *box_vclock = &replicaset.vclock;
 
 /**
+ * Fiber that performs periodic checkpointing.
+ */
+static struct fiber *checkpoint_daemon;
+
+/**
+ * Interval between checkpoints, in seconds.
+ */
+static double checkpoint_interval;
+
+/**
+ * Time of the next scheduled checkpoint that will be
+ * performed by the checkpoint daemon.
+ */
+static double next_checkpoint_time;
+
+/**
  * Set if backup is in progress, i.e. box_backup_start() was
  * called but box_backup_stop() hasn't been yet.
  */
@@ -361,6 +377,60 @@ apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
 	space_apply_initial_join_row_xc(space, &request);
 }
 
+static int
+checkpoint_daemon_f(va_list ap)
+{
+	(void)ap;
+	assert(checkpoint_daemon == fiber());
+	while (!fiber_is_cancelled()) {
+		double now = ev_monotonic_now(loop());
+		if (now < next_checkpoint_time) {
+			fiber_sleep(next_checkpoint_time - now);
+			continue;
+		}
+		if (box_checkpoint_is_in_progress) {
+			/*
+			 * The next checkpoint will be scheduled
+			 * by the concurrent box_checkpoint().
+			 */
+			next_checkpoint_time = now + TIMEOUT_INFINITY;
+			continue;
+		}
+		box_checkpoint();
+	}
+	checkpoint_daemon = NULL;
+	return 0;
+}
+
+static void
+start_checkpoint_daemon(void)
+{
+	assert(checkpoint_daemon == NULL);
+	checkpoint_daemon = fiber_new("checkpoint_daemon", checkpoint_daemon_f);
+	if (checkpoint_daemon == NULL)
+		panic("failed to start checkpoint daemon");
+	next_checkpoint_time = ev_monotonic_now(loop()) + TIMEOUT_INFINITY;
+	fiber_wakeup(checkpoint_daemon);
+}
+
+static void
+schedule_next_checkpoint(double timeout)
+{
+	if (checkpoint_daemon == NULL)
+		return;
+
+	next_checkpoint_time = ev_monotonic_now(loop()) + timeout;
+	if (checkpoint_daemon != fiber())
+		fiber_wakeup(checkpoint_daemon);
+
+	char buf[128];
+	struct tm tm;
+	time_t time = (time_t)ev_now(loop()) + timeout;
+	localtime_r(&time, &tm);
+	strftime(buf, sizeof(buf), "%c", &tm);
+	say_info("scheduled next checkpoint for %s", buf);
+}
+
 /* {{{ configuration bindings */
 
 static void
@@ -844,6 +914,30 @@ box_set_readahead(void)
 }
 
 void
+box_set_checkpoint_interval(void)
+{
+	checkpoint_interval = cfg_getd("checkpoint_interval");
+	if (checkpoint_interval > 0) {
+		/*
+		 * Add a random offset to the initial period to avoid
+		 * simultaneous checkpointing when multiple instances
+		 * are running on the same host.
+		 */
+		double timeout = checkpoint_interval +
+				rand() % ((int)checkpoint_interval + 1);
+		schedule_next_checkpoint(timeout);
+	} else {
+		/*
+		 * Effectively disable periodic checkpointing by
+		 * setting the next checkpoint time to a very large
+		 * value.
+		 */
+		next_checkpoint_time = ev_monotonic_now(loop()) +
+						TIMEOUT_INFINITY;
+	}
+}
+
+void
 box_set_checkpoint_count(void)
 {
 	int checkpoint_count = cfg_geti("checkpoint_count");
@@ -2141,6 +2235,7 @@ box_cfg_xc(void)
 	replicaset_follow();
 
 	sql_load_schema();
+	start_checkpoint_daemon();
 
 	fiber_gc();
 	is_box_configured = true;
@@ -2210,6 +2305,13 @@ end:
 	box_checkpoint_is_in_progress = false;
 
 	/*
+	 * Schedule the next checkpoint if periodic checkpointing
+	 * is configured.
+	 */
+	if (checkpoint_interval > 0)
+		schedule_next_checkpoint(checkpoint_interval);
+
+	/*
 	 * Wait for background garbage collection that might
 	 * have been triggered by this checkpoint to complete.
 	 * Strictly speaking, it isn't necessary, but it
diff --git a/src/box/box.h b/src/box/box.h
index 7712c192..9bde583d 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -199,6 +199,7 @@ void box_set_snap_io_rate_limit(void);
 void box_set_too_long_threshold(void);
 void box_set_readahead(void);
 void box_set_checkpoint_count(void);
+void box_set_checkpoint_interval(void);
 void box_set_memtx_memory(void);
 void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_memory(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index c3825591..4f08c78e 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -165,6 +165,17 @@ lbox_cfg_set_checkpoint_count(struct lua_State *L)
 }
 
 static int
+lbox_cfg_set_checkpoint_interval(struct lua_State *L)
+{
+	try {
+		box_set_checkpoint_interval();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
+static int
 lbox_cfg_set_read_only(struct lua_State *L)
 {
 	try {
@@ -340,6 +351,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_too_long_threshold", lbox_cfg_set_too_long_threshold},
 		{"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit},
 		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
+		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
 		{"cfg_set_read_only", lbox_cfg_set_read_only},
 		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
 		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/checkpoint_daemon.lua b/src/box/lua/checkpoint_daemon.lua
deleted file mode 100644
index 576c4a5c..00000000
--- a/src/box/lua/checkpoint_daemon.lua
+++ /dev/null
@@ -1,136 +0,0 @@
--- checkpoint_daemon.lua (internal file)
-
-local log = require 'log'
-local fiber = require 'fiber'
-local fio = require 'fio'
-local yaml = require 'yaml'
-local errno = require 'errno'
-local digest = require 'digest'
-local pickle = require 'pickle'
-
-local PREFIX = 'checkpoint_daemon'
-
-local daemon = {
-    checkpoint_interval = 0;
-    fiber = nil;
-    control = nil;
-}
-
--- create snapshot, return true if no errors
-local function snapshot()
-    log.info("making snapshot...")
-    local s, e = pcall(function() box.snapshot() end)
-    if s then
-        return true
-    end
-    -- don't complain in the log if the snapshot already exists
-    if errno() == errno.EEXIST then
-        return false
-    end
-    log.error("error while creating snapshot: %s", e)
-    return false
-end
-
--- check filesystem and current time
-local function process(self)
-
-    if daemon.checkpoint_interval == nil then
-        return false
-    end
-
-    if not(daemon.checkpoint_interval > 0) then
-        return false
-    end
-
-    local checkpoints = box.info.gc().checkpoints
-    local last_checkpoint = checkpoints[#checkpoints]
-
-    local last_snap = fio.pathjoin(box.cfg.memtx_dir,
-            string.format('%020d.snap', last_checkpoint.signature))
-    local snstat = fio.stat(last_snap)
-    if snstat == nil then
-        log.error("can't stat %s: %s", last_snap, errno.strerror())
-        return false
-    end
-    if snstat.mtime + daemon.checkpoint_interval <= fiber.time() then
-        return snapshot()
-    end
-end
-
-local function daemon_fiber(self)
-    fiber.name(PREFIX, {truncate = true})
-    log.info("started")
-
-    --
-    -- Add random offset to the initial period to avoid simultaneous
-    -- snapshotting when multiple instances of tarantool are running
-    -- on the same host.
-    -- See https://github.com/tarantool/tarantool/issues/732
-    --
-    local random = pickle.unpack('i', digest.urandom(4))
-    local offset = random % self.checkpoint_interval
-    while true do
-        local period = self.checkpoint_interval + offset
-        -- maintain next_snapshot_time as a self member for testing purposes
-        self.next_snapshot_time = fiber.time() + period
-        log.info("scheduled the next snapshot at %s",
-                os.date("%c", self.next_snapshot_time))
-        local msg = self.control:get(period)
-        if msg == 'shutdown' then
-            break
-        elseif msg == 'reload' then
-            offset = random % self.checkpoint_interval
-            log.info("reloaded") -- continue
-        elseif msg == nil and box.info.status == 'running' then
-            local s, e = pcall(process, self)
-            if not s then
-                log.error(e)
-            end
-            offset = 0
-        end
-    end
-    self.next_snapshot_time = nil
-    log.info("stopped")
-end
-
-local function reload(self)
-    if self.checkpoint_interval > 0 then
-        if self.control == nil then
-            -- Start daemon
-            self.control = fiber.channel()
-            self.fiber = fiber.create(daemon_fiber, self)
-            fiber.sleep(0)
-        else
-            -- Reload daemon
-            self.control:put("reload")
-            --
-            -- channel:put() doesn't block the writer if there
-            -- is a ready reader. Give daemon fiber way so that
-            -- it can execute before reload() returns to the caller.
-            --
-            fiber.sleep(0)
-        end
-    elseif self.control ~= nil then
-        -- Shutdown daemon
-        self.control:put("shutdown")
-        self.fiber = nil
-        self.control = nil
-        fiber.sleep(0) -- see comment above
-    end
-end
-
-setmetatable(daemon, {
-    __index = {
-        set_checkpoint_interval = function()
-            daemon.checkpoint_interval = box.cfg.checkpoint_interval
-            reload(daemon)
-            return
-        end,
-    }
-})
-
-if box.internal == nil then
-    box.internal = { [PREFIX] = daemon }
-else
-    box.internal[PREFIX] = daemon
-end
diff --git a/src/box/lua/init.c b/src/box/lua/init.c
index ccb4c6a4..0e90f6be 100644
--- a/src/box/lua/init.c
+++ b/src/box/lua/init.c
@@ -65,7 +65,6 @@ extern char session_lua[],
 	schema_lua[],
 	load_cfg_lua[],
 	xlog_lua[],
-	checkpoint_daemon_lua[],
 	feedback_daemon_lua[],
 	net_box_lua[],
 	upgrade_lua[],
@@ -75,7 +74,6 @@ static const char *lua_sources[] = {
 	"box/session", session_lua,
 	"box/tuple", tuple_lua,
 	"box/schema", schema_lua,
-	"box/checkpoint_daemon", checkpoint_daemon_lua,
 	"box/feedback_daemon", feedback_daemon_lua,
 	"box/upgrade", upgrade_lua,
 	"box/net_box", net_box_lua,
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index fd99206f..a9e6fe07 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -227,7 +227,7 @@ local dynamic_cfg = {
     vinyl_cache             = private.cfg_set_vinyl_cache,
     vinyl_timeout           = private.cfg_set_vinyl_timeout,
     checkpoint_count        = private.cfg_set_checkpoint_count,
-    checkpoint_interval     = private.checkpoint_daemon.set_checkpoint_interval,
+    checkpoint_interval     = private.cfg_set_checkpoint_interval,
     worker_pool_threads     = private.cfg_set_worker_pool_threads,
     feedback_enabled        = private.feedback_daemon.set_feedback_params,
     feedback_host           = private.feedback_daemon.set_feedback_params,
diff --git a/test/xlog/checkpoint_daemon.result b/test/xlog/checkpoint_daemon.result
index 3a75137d..f1d8690a 100644
--- a/test/xlog/checkpoint_daemon.result
+++ b/test/xlog/checkpoint_daemon.result
@@ -151,148 +151,3 @@ box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 }
 space:drop()
 ---
 ...
-daemon = box.internal.checkpoint_daemon
----
-...
--- stop daemon
-box.cfg{ checkpoint_interval = 0 }
----
-...
--- wait daemon to stop
-while daemon.fiber ~= nil do fiber.sleep(0) end
----
-...
-daemon.fiber == nil
----
-- true
-...
--- start daemon
-box.cfg{ checkpoint_interval = 10 }
----
-...
-daemon.fiber ~= nil
----
-- true
-...
--- reload configuration
-box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 }
----
-...
-daemon.checkpoint_interval == 15
----
-- true
-...
-daemon.checkpoint_count = 20
----
-...
--- Check that checkpoint_count can't be < 1.
-box.cfg{ checkpoint_count = 1 }
----
-...
-box.cfg{ checkpoint_count = 0 }
----
-- error: 'Incorrect value for option ''checkpoint_count'': the value must not be less
-    than one'
-...
-box.cfg.checkpoint_count
----
-- 1
-...
--- Start
-PERIOD = 3600
----
-...
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
----
-...
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
----
-...
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-daemon_fiber = daemon.fiber
----
-...
-daemon_control = daemon.control
----
-...
--- Reload #1
-PERIOD = 100
----
-...
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
----
-...
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
----
-...
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-daemon.fiber == daemon_fiber
----
-- true
-...
-daemon.control == daemon_control
----
-- true
-...
--- Reload #2
-PERIOD = 1000
----
-...
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
----
-...
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
----
-...
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
----
-- true
-...
-daemon.fiber == daemon_fiber
----
-- true
-...
-daemon.control == daemon_control
----
-- true
-...
-daemon_control = nil
----
-...
-daemin_fiber = nil
----
-...
--- Shutdown
-box.cfg{ checkpoint_count = 2, checkpoint_interval = 0}
----
-...
-daemon.next_snapshot_time
----
-- null
-...
-daemon.fiber == nil
----
-- true
-...
-daemon.control == nil
----
-- true
-...
diff --git a/test/xlog/checkpoint_daemon.test.lua b/test/xlog/checkpoint_daemon.test.lua
index f3490621..b67879c4 100644
--- a/test/xlog/checkpoint_daemon.test.lua
+++ b/test/xlog/checkpoint_daemon.test.lua
@@ -88,59 +88,3 @@ test_run:cmd("setopt delimiter ''");
 -- restore default options
 box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 }
 space:drop()
-
-daemon = box.internal.checkpoint_daemon
--- stop daemon
-box.cfg{ checkpoint_interval = 0 }
--- wait daemon to stop
-while daemon.fiber ~= nil do fiber.sleep(0) end
-daemon.fiber == nil
--- start daemon
-box.cfg{ checkpoint_interval = 10 }
-daemon.fiber ~= nil
--- reload configuration
-box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 }
-daemon.checkpoint_interval == 15
-daemon.checkpoint_count = 20
-
--- Check that checkpoint_count can't be < 1.
-box.cfg{ checkpoint_count = 1 }
-box.cfg{ checkpoint_count = 0 }
-box.cfg.checkpoint_count
-
--- Start
-PERIOD = 3600
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
-
-daemon_fiber = daemon.fiber
-daemon_control = daemon.control
-
--- Reload #1
-PERIOD = 100
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
-daemon.fiber == daemon_fiber
-daemon.control == daemon_control
-
--- Reload #2
-PERIOD = 1000
-box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD}
-snapshot_time, time  = daemon.next_snapshot_time, fiber.time()
-snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD}
-snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD}
-daemon.fiber == daemon_fiber
-daemon.control == daemon_control
-
-daemon_control = nil
-daemin_fiber = nil
-
--- Shutdown
-box.cfg{ checkpoint_count = 2, checkpoint_interval = 0}
-daemon.next_snapshot_time
-daemon.fiber == nil
-daemon.control == nil
-- 
2.11.0




More information about the Tarantool-patches mailing list