From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH v2 08/10] Rewrite checkpoint daemon in C Date: Sat, 8 Dec 2018 18:48:12 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: Long time ago, when the checkpoint daemon was added to Tarantool, it was responsible not only for making periodic checkpoints, but also for maintaining the configured number of checkpoints and removing old snap and xlog files, so it was much easier to implement it in Lua than in C. However, over time, all its responsibilities have been reimplemented in C and moved to the server code so that now it just calls box.snapshot() periodically. Let's rewrite this simple procedure in C as well - this will allow us to easily add more complex logic there, e.g. triggering checkpoint when WAL files exceed a configured threshold. Note, this patch removes a few cases from xlog/checkpoint_daemon test that tested the internal state of the checkpoint daemon, which isn't available in Lua anymore. This is OK as those cases are covered by unit/checkpoint_schedule test. --- src/box/CMakeLists.txt | 1 - src/box/box.cc | 7 ++ src/box/box.h | 1 + src/box/gc.c | 105 ++++++++++++++++++++++--- src/box/gc.h | 12 +++ src/box/lua/cfg.cc | 12 +++ src/box/lua/checkpoint_daemon.lua | 136 --------------------------------- src/box/lua/init.c | 2 - src/box/lua/load_cfg.lua | 2 +- test/xlog/checkpoint_daemon.result | 143 +++-------------------------------- test/xlog/checkpoint_daemon.test.lua | 61 ++------------- 11 files changed, 146 insertions(+), 336 deletions(-) delete mode 100644 src/box/lua/checkpoint_daemon.lua diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index d7a52c5e..5260092f 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -7,7 +7,6 @@ lua_source(lua_sources lua/load_cfg.lua) lua_source(lua_sources lua/schema.lua) lua_source(lua_sources lua/tuple.lua) lua_source(lua_sources lua/session.lua) -lua_source(lua_sources lua/checkpoint_daemon.lua) lua_source(lua_sources lua/feedback_daemon.lua) lua_source(lua_sources lua/net_box.lua) lua_source(lua_sources lua/upgrade.lua) diff --git a/src/box/box.cc b/src/box/box.cc index 121ad787..771f2b8c 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -851,6 +851,13 @@ box_set_checkpoint_count(void) } void +box_set_checkpoint_interval(void) +{ + double interval = cfg_getd("checkpoint_interval"); + gc_set_checkpoint_interval(interval); +} + +void box_set_vinyl_memory(void) { struct vinyl_engine *vinyl; diff --git a/src/box/box.h b/src/box/box.h index 6de0691d..91e41a9d 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -194,6 +194,7 @@ void box_set_snap_io_rate_limit(void); void box_set_too_long_threshold(void); void box_set_readahead(void); void box_set_checkpoint_count(void); +void box_set_checkpoint_interval(void); void box_set_memtx_memory(void); void box_set_memtx_max_tuple_size(void); void box_set_vinyl_memory(void); diff --git a/src/box/gc.c b/src/box/gc.c index e1b23eed..e8074078 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -39,10 +39,12 @@ #include #include #include +#include #define RB_COMPACT 1 #include #include +#include #include "diag.h" #include "errcode.h" @@ -55,11 +57,14 @@ #include "schema.h" #include "engine.h" /* engine_collect_garbage() */ #include "wal.h" /* wal_collect_garbage() */ +#include "checkpoint_schedule.h" struct gc_state gc; static int gc_cleanup_fiber_f(va_list); +static int +gc_checkpoint_fiber_f(va_list); /** * Comparator used for ordering gc_consumer objects by signature @@ -108,12 +113,19 @@ gc_init(void) rlist_create(&gc.checkpoints); gc_tree_new(&gc.consumers); fiber_cond_create(&gc.cleanup_cond); + checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0); gc.cleanup_fiber = fiber_new("gc", gc_cleanup_fiber_f); if (gc.cleanup_fiber == NULL) panic("failed to start garbage collection fiber"); + gc.checkpoint_fiber = fiber_new("checkpoint_daemon", + gc_checkpoint_fiber_f); + if (gc.checkpoint_fiber == NULL) + panic("failed to start checkpoint daemon fiber"); + fiber_start(gc.cleanup_fiber); + fiber_start(gc.checkpoint_fiber); } void @@ -294,6 +306,18 @@ gc_set_min_checkpoint_count(int min_checkpoint_count) } void +gc_set_checkpoint_interval(double interval) +{ + /* + * Reconfigure the schedule and wake up the checkpoint + * daemon so that it can readjust. + */ + checkpoint_schedule_cfg(&gc.checkpoint_schedule, + ev_monotonic_now(loop()), interval); + fiber_wakeup(gc.checkpoint_fiber); +} + +void gc_add_checkpoint(const struct vclock *vclock) { struct gc_checkpoint *last_checkpoint = gc_last_checkpoint(); @@ -327,16 +351,13 @@ gc_add_checkpoint(const struct vclock *vclock) gc_schedule_cleanup(); } -int -gc_checkpoint(void) +static int +gc_do_checkpoint(void) { int rc; struct vclock vclock; - if (gc.checkpoint_is_in_progress) { - diag_set(ClientError, ER_CHECKPOINT_IN_PROGRESS); - return -1; - } + assert(!gc.checkpoint_is_in_progress); gc.checkpoint_is_in_progress = true; /* @@ -371,6 +392,27 @@ out: latch_unlock(&schema_lock); gc.checkpoint_is_in_progress = false; + return rc; +} + +int +gc_checkpoint(void) +{ + if (gc.checkpoint_is_in_progress) { + diag_set(ClientError, ER_CHECKPOINT_IN_PROGRESS); + return -1; + } + + /* + * Reset the schedule and wake up the checkpoint daemon + * so that it can readjust. + */ + checkpoint_schedule_reset(&gc.checkpoint_schedule, + ev_monotonic_now(loop())); + fiber_wakeup(gc.checkpoint_fiber); + + if (gc_do_checkpoint() != 0) + return -1; /* * Wait for background garbage collection that might @@ -380,10 +422,55 @@ out: * time box.snapshot() returns, all outdated checkpoint * files have been removed. */ - if (rc == 0) - gc_wait_cleanup(); + gc_wait_cleanup(); + return 0; +} - return rc; +static int +gc_checkpoint_fiber_f(va_list ap) +{ + (void)ap; + + /* + * Make the fiber non-cancellable so as not to bother + * about spurious wakeups. + */ + fiber_set_cancellable(false); + + struct checkpoint_schedule *sched = &gc.checkpoint_schedule; + while (!fiber_is_cancelled()) { + double timeout = checkpoint_schedule_timeout(sched, + ev_monotonic_now(loop())); + if (timeout > 0) { + char buf[128]; + struct tm tm; + time_t time = (time_t)(ev_now(loop()) + timeout); + localtime_r(&time, &tm); + strftime(buf, sizeof(buf), "%c", &tm); + say_info("scheduled next checkpoint for %s", buf); + } else { + /* Periodic checkpointing is disabled. */ + timeout = TIMEOUT_INFINITY; + } + if (!fiber_yield_timeout(timeout)) { + /* + * The checkpoint schedule has changed. + * Reschedule the next checkpoint. + */ + continue; + } + /* Time to make the next scheduled checkpoint. */ + if (gc.checkpoint_is_in_progress) { + /* + * Another fiber is making a checkpoint. + * Skip this one. + */ + continue; + } + if (gc_do_checkpoint() != 0) + diag_log(); + } + return 0; } void diff --git a/src/box/gc.h b/src/box/gc.h index 15927726..ffbafd34 100644 --- a/src/box/gc.h +++ b/src/box/gc.h @@ -38,6 +38,7 @@ #include "fiber_cond.h" #include "vclock.h" #include "trivia/util.h" +#include "checkpoint_schedule.h" #if defined(__cplusplus) extern "C" { @@ -122,6 +123,10 @@ struct gc_state { struct rlist checkpoints; /** Registered consumers, linked by gc_consumer::node. */ gc_tree_t consumers; + /** Fiber responsible for periodic checkpointing. */ + struct fiber *checkpoint_fiber; + /** Schedule of periodic checkpoints. */ + struct checkpoint_schedule checkpoint_schedule; /** Fiber that removes old files in the background. */ struct fiber *cleanup_fiber; /** @@ -215,6 +220,13 @@ void gc_set_min_checkpoint_count(int min_checkpoint_count); /** + * Set the time interval between checkpoints, in seconds. + * Setting the interval to 0 disables periodic checkpointing. + */ +void +gc_set_checkpoint_interval(double interval); + +/** * Track an existing checkpoint in the garbage collector state. * Note, this function may trigger garbage collection to remove * old checkpoints. diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index c3825591..4f08c78e 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -165,6 +165,17 @@ lbox_cfg_set_checkpoint_count(struct lua_State *L) } static int +lbox_cfg_set_checkpoint_interval(struct lua_State *L) +{ + try { + box_set_checkpoint_interval(); + } catch (Exception *) { + luaT_error(L); + } + return 0; +} + +static int lbox_cfg_set_read_only(struct lua_State *L) { try { @@ -340,6 +351,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_too_long_threshold", lbox_cfg_set_too_long_threshold}, {"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit}, {"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count}, + {"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval}, {"cfg_set_read_only", lbox_cfg_set_read_only}, {"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory}, {"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size}, diff --git a/src/box/lua/checkpoint_daemon.lua b/src/box/lua/checkpoint_daemon.lua deleted file mode 100644 index 576c4a5c..00000000 --- a/src/box/lua/checkpoint_daemon.lua +++ /dev/null @@ -1,136 +0,0 @@ --- checkpoint_daemon.lua (internal file) - -local log = require 'log' -local fiber = require 'fiber' -local fio = require 'fio' -local yaml = require 'yaml' -local errno = require 'errno' -local digest = require 'digest' -local pickle = require 'pickle' - -local PREFIX = 'checkpoint_daemon' - -local daemon = { - checkpoint_interval = 0; - fiber = nil; - control = nil; -} - --- create snapshot, return true if no errors -local function snapshot() - log.info("making snapshot...") - local s, e = pcall(function() box.snapshot() end) - if s then - return true - end - -- don't complain in the log if the snapshot already exists - if errno() == errno.EEXIST then - return false - end - log.error("error while creating snapshot: %s", e) - return false -end - --- check filesystem and current time -local function process(self) - - if daemon.checkpoint_interval == nil then - return false - end - - if not(daemon.checkpoint_interval > 0) then - return false - end - - local checkpoints = box.info.gc().checkpoints - local last_checkpoint = checkpoints[#checkpoints] - - local last_snap = fio.pathjoin(box.cfg.memtx_dir, - string.format('%020d.snap', last_checkpoint.signature)) - local snstat = fio.stat(last_snap) - if snstat == nil then - log.error("can't stat %s: %s", last_snap, errno.strerror()) - return false - end - if snstat.mtime + daemon.checkpoint_interval <= fiber.time() then - return snapshot() - end -end - -local function daemon_fiber(self) - fiber.name(PREFIX, {truncate = true}) - log.info("started") - - -- - -- Add random offset to the initial period to avoid simultaneous - -- snapshotting when multiple instances of tarantool are running - -- on the same host. - -- See https://github.com/tarantool/tarantool/issues/732 - -- - local random = pickle.unpack('i', digest.urandom(4)) - local offset = random % self.checkpoint_interval - while true do - local period = self.checkpoint_interval + offset - -- maintain next_snapshot_time as a self member for testing purposes - self.next_snapshot_time = fiber.time() + period - log.info("scheduled the next snapshot at %s", - os.date("%c", self.next_snapshot_time)) - local msg = self.control:get(period) - if msg == 'shutdown' then - break - elseif msg == 'reload' then - offset = random % self.checkpoint_interval - log.info("reloaded") -- continue - elseif msg == nil and box.info.status == 'running' then - local s, e = pcall(process, self) - if not s then - log.error(e) - end - offset = 0 - end - end - self.next_snapshot_time = nil - log.info("stopped") -end - -local function reload(self) - if self.checkpoint_interval > 0 then - if self.control == nil then - -- Start daemon - self.control = fiber.channel() - self.fiber = fiber.create(daemon_fiber, self) - fiber.sleep(0) - else - -- Reload daemon - self.control:put("reload") - -- - -- channel:put() doesn't block the writer if there - -- is a ready reader. Give daemon fiber way so that - -- it can execute before reload() returns to the caller. - -- - fiber.sleep(0) - end - elseif self.control ~= nil then - -- Shutdown daemon - self.control:put("shutdown") - self.fiber = nil - self.control = nil - fiber.sleep(0) -- see comment above - end -end - -setmetatable(daemon, { - __index = { - set_checkpoint_interval = function() - daemon.checkpoint_interval = box.cfg.checkpoint_interval - reload(daemon) - return - end, - } -}) - -if box.internal == nil then - box.internal = { [PREFIX] = daemon } -else - box.internal[PREFIX] = daemon -end diff --git a/src/box/lua/init.c b/src/box/lua/init.c index ccb4c6a4..0e90f6be 100644 --- a/src/box/lua/init.c +++ b/src/box/lua/init.c @@ -65,7 +65,6 @@ extern char session_lua[], schema_lua[], load_cfg_lua[], xlog_lua[], - checkpoint_daemon_lua[], feedback_daemon_lua[], net_box_lua[], upgrade_lua[], @@ -75,7 +74,6 @@ static const char *lua_sources[] = { "box/session", session_lua, "box/tuple", tuple_lua, "box/schema", schema_lua, - "box/checkpoint_daemon", checkpoint_daemon_lua, "box/feedback_daemon", feedback_daemon_lua, "box/upgrade", upgrade_lua, "box/net_box", net_box_lua, diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 38e742c8..321fd3ad 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -227,7 +227,7 @@ local dynamic_cfg = { vinyl_cache = private.cfg_set_vinyl_cache, vinyl_timeout = private.cfg_set_vinyl_timeout, checkpoint_count = private.cfg_set_checkpoint_count, - checkpoint_interval = private.checkpoint_daemon.set_checkpoint_interval, + checkpoint_interval = private.cfg_set_checkpoint_interval, worker_pool_threads = private.cfg_set_worker_pool_threads, feedback_enabled = private.feedback_daemon.set_feedback_params, feedback_host = private.feedback_daemon.set_feedback_params, diff --git a/test/xlog/checkpoint_daemon.result b/test/xlog/checkpoint_daemon.result index 3a75137d..6c96da0d 100644 --- a/test/xlog/checkpoint_daemon.result +++ b/test/xlog/checkpoint_daemon.result @@ -16,6 +16,12 @@ test_run = env.new() test_run:cleanup_cluster() --- ... +default_checkpoint_count = box.cfg.checkpoint_count +--- +... +default_checkpoint_interval = box.cfg.checkpoint_interval +--- +... box.cfg{checkpoint_interval = 0} --- ... @@ -144,47 +150,6 @@ test_run:cmd("setopt delimiter ''"); --- - true ... --- restore default options -box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 } ---- -... -space:drop() ---- -... -daemon = box.internal.checkpoint_daemon ---- -... --- stop daemon -box.cfg{ checkpoint_interval = 0 } ---- -... --- wait daemon to stop -while daemon.fiber ~= nil do fiber.sleep(0) end ---- -... -daemon.fiber == nil ---- -- true -... --- start daemon -box.cfg{ checkpoint_interval = 10 } ---- -... -daemon.fiber ~= nil ---- -- true -... --- reload configuration -box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 } ---- -... -daemon.checkpoint_interval == 15 ---- -- true -... -daemon.checkpoint_count = 20 ---- -... -- Check that checkpoint_count can't be < 1. box.cfg{ checkpoint_count = 1 } --- @@ -198,101 +163,13 @@ box.cfg.checkpoint_count --- - 1 ... --- Start -PERIOD = 3600 ---- -... -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} ---- -... -snapshot_time, time = daemon.next_snapshot_time, fiber.time() ---- -... -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -daemon_fiber = daemon.fiber ---- -... -daemon_control = daemon.control ---- -... --- Reload #1 -PERIOD = 100 +-- Restore default options. +box.cfg{checkpoint_count = default_checkpoint_count} --- ... -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} +box.cfg{checkpoint_interval = default_checkpoint_interval} --- ... -snapshot_time, time = daemon.next_snapshot_time, fiber.time() ---- -... -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -daemon.fiber == daemon_fiber ---- -- true -... -daemon.control == daemon_control ---- -- true -... --- Reload #2 -PERIOD = 1000 ---- -... -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} ---- -... -snapshot_time, time = daemon.next_snapshot_time, fiber.time() ---- -... -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -daemon.fiber == daemon_fiber ---- -- true -... -daemon.control == daemon_control ---- -- true -... -daemon_control = nil ---- -... -daemin_fiber = nil ---- -... --- Shutdown -box.cfg{ checkpoint_count = 2, checkpoint_interval = 0} ---- -... -daemon.next_snapshot_time ---- -- null -... -daemon.fiber == nil ---- -- true -... -daemon.control == nil +space:drop() --- -- true ... diff --git a/test/xlog/checkpoint_daemon.test.lua b/test/xlog/checkpoint_daemon.test.lua index f3490621..37d7f752 100644 --- a/test/xlog/checkpoint_daemon.test.lua +++ b/test/xlog/checkpoint_daemon.test.lua @@ -6,6 +6,8 @@ test_run = env.new() test_run:cleanup_cluster() +default_checkpoint_count = box.cfg.checkpoint_count +default_checkpoint_interval = box.cfg.checkpoint_interval box.cfg{checkpoint_interval = 0} PERIOD = jit.os == 'Linux' and 0.03 or 1.5 @@ -85,62 +87,13 @@ test_run:wait_cond(function() end, WAIT_COND_TIMEOUT); test_run:cmd("setopt delimiter ''"); --- restore default options -box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 } -space:drop() - -daemon = box.internal.checkpoint_daemon --- stop daemon -box.cfg{ checkpoint_interval = 0 } --- wait daemon to stop -while daemon.fiber ~= nil do fiber.sleep(0) end -daemon.fiber == nil --- start daemon -box.cfg{ checkpoint_interval = 10 } -daemon.fiber ~= nil --- reload configuration -box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 } -daemon.checkpoint_interval == 15 -daemon.checkpoint_count = 20 - -- Check that checkpoint_count can't be < 1. box.cfg{ checkpoint_count = 1 } box.cfg{ checkpoint_count = 0 } box.cfg.checkpoint_count --- Start -PERIOD = 3600 -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} -snapshot_time, time = daemon.next_snapshot_time, fiber.time() -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} - -daemon_fiber = daemon.fiber -daemon_control = daemon.control - --- Reload #1 -PERIOD = 100 -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} -snapshot_time, time = daemon.next_snapshot_time, fiber.time() -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} -daemon.fiber == daemon_fiber -daemon.control == daemon_control - --- Reload #2 -PERIOD = 1000 -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} -snapshot_time, time = daemon.next_snapshot_time, fiber.time() -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} -daemon.fiber == daemon_fiber -daemon.control == daemon_control - -daemon_control = nil -daemin_fiber = nil - --- Shutdown -box.cfg{ checkpoint_count = 2, checkpoint_interval = 0} -daemon.next_snapshot_time -daemon.fiber == nil -daemon.control == nil +-- Restore default options. +box.cfg{checkpoint_count = default_checkpoint_count} +box.cfg{checkpoint_interval = default_checkpoint_interval} + +space:drop() -- 2.11.0