[tarantool-patches] [PATCH 2/7] Refactoring: Track wal files using gc state.

Georgy Kirichenko georgy at tarantool.org
Tue Aug 13 09:27:40 MSK 2019


Move wal files tracking from wal into gc struct because current garbage
collection was complicated and illogical:
1. wal tracks xlogs ->
   relay catches on_close_log and sends events to gc ->
   gc tracks the least relay signature and sends vclock to wal
   wal collect logs
   gc collects checkpoints independently
2. in case of no space errors wal collects logs (making some checkpoints
     pointless) ->
   wal notifies gc
   gc deactivates outdated relays (this is a BUG because relay may catch
   the currenct state after deactivating)
This flow does not allow in memory replication because relay would not
have xlog boundaries yet.
The flow after the patch is more straightforward:
   wal informs gc about log open/close events
   gc tracks both checkpoints and logs
   relay catches on_close_log and send events to gc (this will be
   changed after in memroy replication patch)
   wal requests gc to free space in case of no space error
   gc could consistently free checkpoints and logs.

As relay notifies tx about ACK state changes gc would be able to track
the oldest used used wal and perform garbage collection as well as relay
deactivating.
---
 src/box/box.cc                        |  25 ++-
 src/box/gc.c                          |  74 ++++++--
 src/box/gc.h                          |  32 +++-
 src/box/lua/info.c                    |  25 ++-
 src/box/memtx_engine.c                |   2 +-
 src/box/recovery.cc                   |   7 +-
 src/box/recovery.h                    |   3 +-
 src/box/vy_log.c                      |   4 +-
 src/box/wal.c                         | 262 ++++++++++++--------------
 src/box/wal.h                         |  23 +--
 src/box/xlog.c                        |  10 +-
 src/box/xlog.h                        |   2 +-
 test/replication/gc_no_space.result   |   4 +-
 test/replication/gc_no_space.test.lua |   4 +-
 14 files changed, 250 insertions(+), 227 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 80249919e..7cc2cf83f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1649,7 +1649,8 @@ box_process_vote(struct ballot *ballot)
 {
 	ballot->is_ro = cfg_geti("read_only") != 0;
 	vclock_copy(&ballot->vclock, &replicaset.vclock);
-	vclock_copy(&ballot->gc_vclock, &gc.vclock);
+	if (xdir_first_vclock(&gc.wal_dir, &ballot->gc_vclock) == -1)
+		vclock_copy(&ballot->gc_vclock, &replicaset.vclock);
 }
 
 /** Insert a new cluster into _schema */
@@ -1938,7 +1939,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 * so we must reflect this in replicaset vclock to
 	 * not attempt to apply these rows twice.
 	 */
-	recovery_scan(recovery, &replicaset.vclock, &gc.vclock);
+	recovery_scan(recovery, &replicaset.vclock);
 	say_info("instance vclock %s", vclock_to_string(&replicaset.vclock));
 
 	if (wal_dir_lock >= 0) {
@@ -2039,12 +2040,6 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
 	cbus_process(endpoint);
 }
 
-static void
-on_wal_garbage_collection(const struct vclock *vclock)
-{
-	gc_advance(vclock);
-}
-
 static void
 on_wal_checkpoint_threshold(void)
 {
@@ -2080,6 +2075,15 @@ box_is_configured(void)
 	return is_box_configured;
 }
 
+static void
+on_wal_log_action(enum wal_log_action action, const struct vclock *vclock)
+{
+	if ((action & WAL_LOG_CLOSE) != 0)
+		gc_close_log(vclock);
+	if ((action & WAL_LOG_OPEN) != 0)
+		gc_open_log(vclock);
+}
+
 static inline void
 box_cfg_xc(void)
 {
@@ -2093,7 +2097,7 @@ box_cfg_xc(void)
 	rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
 	rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
 
-	gc_init();
+	gc_init(cfg_gets("wal_dir"));
 	engine_init();
 	schema_init();
 	replication_init();
@@ -2105,7 +2109,8 @@ box_cfg_xc(void)
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
-		     wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
+		     wal_max_size, &INSTANCE_UUID,
+		     on_wal_log_action,
 		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
 	}
diff --git a/src/box/gc.c b/src/box/gc.c
index a2c963e0a..944a6f3b2 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -57,6 +57,7 @@
 #include "engine.h"		/* engine_collect_garbage() */
 #include "wal.h"		/* wal_collect_garbage() */
 #include "checkpoint_schedule.h"
+#include "replication.h"
 
 struct gc_state gc;
 
@@ -103,13 +104,16 @@ gc_checkpoint_delete(struct gc_checkpoint *checkpoint)
 }
 
 void
-gc_init(void)
+gc_init(const char *wal_dir_name)
 {
 	/* Don't delete any files until recovery is complete. */
 	gc.min_checkpoint_count = INT_MAX;
 
-	vclock_create(&gc.vclock);
 	rlist_create(&gc.checkpoints);
+	xdir_create(&gc.wal_dir, wal_dir_name, XLOG, &INSTANCE_UUID,
+		    &xlog_opts_default);
+	xdir_scan(&gc.wal_dir);
+	gc.log_opened = false;
 	gc_tree_new(&gc.consumers);
 	fiber_cond_create(&gc.cleanup_cond);
 	checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0);
@@ -135,6 +139,7 @@ gc_free(void)
 	 * running when this function is called.
 	 */
 
+	xdir_destroy(&gc.wal_dir);
 	/* Free checkpoints. */
 	struct gc_checkpoint *checkpoint, *next_checkpoint;
 	rlist_foreach_entry_safe(checkpoint, &gc.checkpoints, in_checkpoints,
@@ -196,11 +201,10 @@ gc_run_cleanup(void)
 	if (vclock == NULL ||
 	    vclock_sum(vclock) > vclock_sum(&checkpoint->vclock))
 		vclock = &checkpoint->vclock;
-
-	if (vclock_sum(vclock) > vclock_sum(&gc.vclock)) {
-		vclock_copy(&gc.vclock, vclock);
-		run_wal_gc = true;
-	}
+	int cmp = vclock_compare(vclock, &replicaset.vclock);
+	if (gc.log_opened || !(cmp == 0 || cmp == 1))
+		vclock = vclockset_psearch(&gc.wal_dir.index, vclock);
+	run_wal_gc = vclock != NULL;
 
 	if (!run_engine_gc && !run_wal_gc)
 		return; /* nothing to do */
@@ -221,7 +225,8 @@ gc_run_cleanup(void)
 	if (run_engine_gc)
 		engine_collect_garbage(&checkpoint->vclock);
 	if (run_wal_gc)
-		wal_collect_garbage(vclock);
+		xdir_collect_garbage(&gc.wal_dir, vclock_sum(vclock),
+				     XDIR_GC_ASYNC);
 }
 
 static int
@@ -273,20 +278,32 @@ gc_wait_cleanup(void)
 		fiber_cond_wait(&gc.cleanup_cond);
 }
 
-void
-gc_advance(const struct vclock *vclock)
+int
+gc_force_wal_cleanup()
 {
-	/*
-	 * In case of emergency ENOSPC, the WAL thread may delete
-	 * WAL files needed to restore from backup checkpoints,
-	 * which would be kept by the garbage collector otherwise.
-	 * Bring the garbage collector vclock up to date.
-	 */
-	vclock_copy(&gc.vclock, vclock);
+	/* Get the last checkpoint vclock. */
+	struct gc_checkpoint *checkpoint;
+	checkpoint = rlist_last_entry(&gc.checkpoints,
+				      struct gc_checkpoint, in_checkpoints);
+	/* Check there are files beforte the last checkpoint. */
+	if (!xdir_has_garbage(&gc.wal_dir, vclock_sum(&checkpoint->vclock)))
+		return -1;
+
+	/* And delete the oldest. */
+	xdir_collect_garbage(&gc.wal_dir, vclock_sum(&checkpoint->vclock),
+			     XDIR_GC_ASYNC | XDIR_GC_REMOVE_ONE);
+
+	/* Get the oldest vclock. */
+	struct vclock vclock;
+	if (xdir_first_vclock(&gc.wal_dir, &vclock) < 0) {
+		/* Last wal was deleted, so use last checkpoint's vclock. */
+		vclock_copy(&vclock, &checkpoint->vclock);
+	}
 
+	/* Deactivate all consumers before the oldest vclock. */
 	struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
 	while (consumer != NULL &&
-	       vclock_sum(&consumer->vclock) < vclock_sum(vclock)) {
+		vclock_sum(&consumer->vclock) < vclock_sum(&vclock)) {
 		struct gc_consumer *next = gc_tree_next(&gc.consumers,
 							consumer);
 		assert(!consumer->is_inactive);
@@ -299,6 +316,7 @@ gc_advance(const struct vclock *vclock)
 		consumer = next;
 	}
 	gc_schedule_cleanup();
+	return 0;
 }
 
 void
@@ -358,6 +376,26 @@ gc_add_checkpoint(const struct vclock *vclock)
 	gc_schedule_cleanup();
 }
 
+void
+gc_open_log(const struct vclock *vclock)
+{
+	struct vclock last_vclock;
+	if (xdir_last_vclock(&gc.wal_dir, &last_vclock) == -1 ||
+	    vclock_compare(vclock, &last_vclock) != 0)
+		xdir_add_vclock(&gc.wal_dir, vclock);
+	assert(!gc.log_opened);
+	gc.log_opened = true;
+}
+
+void
+gc_close_log(const struct vclock *vclock)
+{
+	(void) vclock;
+	assert(gc.log_opened);
+	gc.log_opened = false;
+	gc_schedule_cleanup();
+}
+
 static int
 gc_do_checkpoint(void)
 {
diff --git a/src/box/gc.h b/src/box/gc.h
index 5790ebcc6..9b38a0c06 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -39,6 +39,7 @@
 #include "vclock.h"
 #include "trivia/util.h"
 #include "checkpoint_schedule.h"
+#include "xlog.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -103,8 +104,6 @@ typedef rb_tree(struct gc_consumer) gc_tree_t;
 
 /** Garbage collection state. */
 struct gc_state {
-	/** VClock of the oldest WAL row available on the instance. */
-	struct vclock vclock;
 	/**
 	 * Minimal number of checkpoints to preserve.
 	 * Configured by box.cfg.checkpoint_count.
@@ -121,6 +120,10 @@ struct gc_state {
 	 * to the tail. Linked by gc_checkpoint::in_checkpoints.
 	 */
 	struct rlist checkpoints;
+	/** Directory to track log files. */
+	struct xdir wal_dir;
+	/** True if log is opened. */
+	bool log_opened;
 	/** Registered consumers, linked by gc_consumer::node. */
 	gc_tree_t consumers;
 	/** Fiber responsible for periodic checkpointing. */
@@ -198,7 +201,7 @@ gc_last_checkpoint(void)
  * Initialize the garbage collection state.
  */
 void
-gc_init(void);
+gc_init(const char *wal_dir_name);
 
 /**
  * Destroy the garbage collection state.
@@ -206,13 +209,6 @@ gc_init(void);
 void
 gc_free(void);
 
-/**
- * Advance the garbage collector vclock to the given position.
- * Deactivate WAL consumers that need older data.
- */
-void
-gc_advance(const struct vclock *vclock);
-
 /**
  * Update the minimal number of checkpoints to preserve.
  * Called when box.cfg.checkpoint_count is updated.
@@ -239,6 +235,18 @@ gc_set_checkpoint_interval(double interval);
 void
 gc_add_checkpoint(const struct vclock *vclock);
 
+/**
+ * Tell gc a log was opened.
+ */
+void
+gc_open_log(const struct vclock *vclock);
+
+/**
+ * Tell gc a log was closed.
+ */
+void
+gc_close_log(const struct vclock *vclock);
+
 /**
  * Make a checkpoint.
  *
@@ -333,6 +341,10 @@ gc_consumer_iterator_init(struct gc_consumer_iterator *it)
 struct gc_consumer *
 gc_consumer_iterator_next(struct gc_consumer_iterator *it);
 
+/** Delete the oldest wal file before the last checkpoint. */
+int
+gc_force_wal_cleanup();
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index d0e553b1d..62755a825 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -371,14 +371,6 @@ lbox_info_gc_call(struct lua_State *L)
 
 	lua_newtable(L);
 
-	lua_pushstring(L, "vclock");
-	lbox_pushvclock(L, &gc.vclock);
-	lua_settable(L, -3);
-
-	lua_pushstring(L, "signature");
-	luaL_pushint64(L, vclock_sum(&gc.vclock));
-	lua_settable(L, -3);
-
 	lua_pushstring(L, "checkpoint_is_in_progress");
 	lua_pushboolean(L, gc.checkpoint_is_in_progress);
 	lua_settable(L, -3);
@@ -386,9 +378,13 @@ lbox_info_gc_call(struct lua_State *L)
 	lua_pushstring(L, "checkpoints");
 	lua_newtable(L);
 
+	struct vclock *checkpoint_vclock = NULL;
+
 	count = 0;
 	struct gc_checkpoint *checkpoint;
 	gc_foreach_checkpoint(checkpoint) {
+		if (checkpoint_vclock == NULL)
+			checkpoint_vclock = &checkpoint->vclock;
 		lua_createtable(L, 0, 2);
 
 		lua_pushstring(L, "vclock");
@@ -416,12 +412,15 @@ lbox_info_gc_call(struct lua_State *L)
 	lua_pushstring(L, "consumers");
 	lua_newtable(L);
 
+	struct vclock *gc_vclock = NULL;
 	struct gc_consumer_iterator consumers;
 	gc_consumer_iterator_init(&consumers);
 
 	count = 0;
 	struct gc_consumer *consumer;
 	while ((consumer = gc_consumer_iterator_next(&consumers)) != NULL) {
+		if (gc_vclock == NULL)
+			gc_vclock = &consumer->vclock;
 		lua_createtable(L, 0, 3);
 
 		lua_pushstring(L, "name");
@@ -439,7 +438,17 @@ lbox_info_gc_call(struct lua_State *L)
 		lua_rawseti(L, -2, ++count);
 	}
 	lua_settable(L, -3);
+	if (gc_vclock == NULL ||
+	    vclock_sum(gc_vclock) > vclock_sum(checkpoint_vclock))
+		gc_vclock = checkpoint_vclock;
+
+	lua_pushstring(L, "vclock");
+	lbox_pushvclock(L, gc_vclock);
+	lua_settable(L, -3);
 
+	lua_pushstring(L, "signature");
+	luaL_pushint64(L, vclock_sum(gc_vclock));
+	lua_settable(L, -3);
 	return 1;
 }
 
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 59ad16823..9b3f2233e 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -568,7 +568,7 @@ checkpoint_f(va_list ap)
 	}
 
 	struct xlog snap;
-	if (xdir_create_xlog(&ckpt->dir, &snap, &ckpt->vclock) != 0)
+	if (xdir_create_xlog(&ckpt->dir, &snap, &ckpt->vclock, NULL) != 0)
 		return -1;
 
 	say_info("saving snapshot `%s'", snap.filename);
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index d122d618a..45c4f6820 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -118,22 +118,17 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 }
 
 void
-recovery_scan(struct recovery *r, struct vclock *end_vclock,
-	      struct vclock *gc_vclock)
+recovery_scan(struct recovery *r, struct vclock *end_vclock)
 {
 	xdir_scan_xc(&r->wal_dir);
 
 	if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 ||
 	    vclock_compare(end_vclock, &r->vclock) < 0) {
 		/* No xlogs after last checkpoint. */
-		vclock_copy(gc_vclock, &r->vclock);
 		vclock_copy(end_vclock, &r->vclock);
 		return;
 	}
 
-	if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0)
-		unreachable();
-
 	/* Scan the last xlog to find end vclock. */
 	struct xlog_cursor cursor;
 	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 6e68abc0b..2a03cfc2f 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -75,8 +75,7 @@ recovery_delete(struct recovery *r);
  * WAL directory.
  */
 void
-recovery_scan(struct recovery *r,  struct vclock *end_vclock,
-	      struct vclock *gc_vclock);
+recovery_scan(struct recovery *r, struct vclock *end_vclock);
 
 void
 recovery_follow_local(struct recovery *r, struct xstream *stream,
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index cb291f3c8..d97eff9b8 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -949,7 +949,7 @@ vy_log_open(struct xlog *xlog)
 	}
 
 	if (xdir_create_xlog(&vy_log.dir, xlog,
-			     &vy_log.last_checkpoint) < 0)
+			     &vy_log.last_checkpoint, NULL) < 0)
 		goto fail;
 
 	struct xrow_header row;
@@ -2585,7 +2585,7 @@ vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery)
 	rlist_foreach_entry(lsm, &recovery->lsms, in_recovery) {
 		/* Create the log file on the first write. */
 		if (!xlog_is_open(&xlog) &&
-		    xdir_create_xlog(&vy_log.dir, &xlog, vclock) != 0)
+		    xdir_create_xlog(&vy_log.dir, &xlog, vclock, NULL) != 0)
 			goto err_create_xlog;
 
 		if (vy_log_append_lsm(&xlog, lsm) != 0)
diff --git a/src/box/wal.c b/src/box/wal.c
index 5d8dcc4f7..a09ab7187 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -43,6 +43,7 @@
 #include "cbus.h"
 #include "coio_task.h"
 #include "replication.h"
+#include "gc.h"
 
 enum {
 	/**
@@ -78,7 +79,7 @@ struct wal_writer
 {
 	struct journal base;
 	/* ----------------- tx ------------------- */
-	wal_on_garbage_collection_f on_garbage_collection;
+	wal_on_log_action_f on_log_action;
 	wal_on_checkpoint_threshold_f on_checkpoint_threshold;
 	/**
 	 * The rollback queue. An accumulator for all requests
@@ -123,12 +124,7 @@ struct wal_writer
 	 * with this LSN and LSN becomes "real".
 	 */
 	struct vclock vclock;
-	/**
-	 * VClock of the most recent successfully created checkpoint.
-	 * The WAL writer must not delete WAL files that are needed to
-	 * recover from it even if it is running out of disk space.
-	 */
-	struct vclock checkpoint_vclock;
+	struct vclock prev_vclock;
 	/** Total size of WAL files written since the last checkpoint. */
 	int64_t checkpoint_wal_size;
 	/**
@@ -341,28 +337,6 @@ tx_schedule_rollback(struct cmsg *msg)
 			     container_of(msg, struct wal_msg, base));
 }
 
-
-/**
- * This message is sent from WAL to TX when the WAL thread hits
- * ENOSPC and has to delete some backup WAL files to continue.
- * The TX thread uses this message to shoot off WAL consumers
- * that needed deleted WAL files.
- */
-struct tx_notify_gc_msg {
-	struct cmsg base;
-	/** VClock of the oldest WAL row preserved by WAL. */
-	struct vclock vclock;
-};
-
-static void
-tx_notify_gc(struct cmsg *msg)
-{
-	struct wal_writer *writer = &wal_writer_singleton;
-	struct vclock *vclock = &((struct tx_notify_gc_msg *)msg)->vclock;
-	writer->on_garbage_collection(vclock);
-	free(msg);
-}
-
 static void
 tx_notify_checkpoint(struct cmsg *msg)
 {
@@ -380,7 +354,7 @@ static void
 wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 		  const char *wal_dirname, int64_t wal_max_rows,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-		  wal_on_garbage_collection_f on_garbage_collection,
+		  wal_on_log_action_f on_log_action,
 		  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	writer->wal_mode = wal_mode;
@@ -404,10 +378,10 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	writer->checkpoint_triggered = false;
 
 	vclock_create(&writer->vclock);
-	vclock_create(&writer->checkpoint_vclock);
+	vclock_clear(&writer->prev_vclock);
 	rlist_create(&writer->watchers);
 
-	writer->on_garbage_collection = on_garbage_collection;
+	writer->on_log_action = on_log_action;
 	writer->on_checkpoint_threshold = on_checkpoint_threshold;
 
 	mempool_create(&writer->msg_pool, &cord()->slabc,
@@ -428,6 +402,40 @@ wal_writer_destroy(struct wal_writer *writer)
 static int
 wal_cord_f(va_list ap);
 
+struct tx_log_action_msg {
+	struct cmsg base;
+	enum wal_log_action action;
+	struct vclock vclock;
+};
+
+static void
+tx_on_log_action(struct cmsg *base)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct tx_log_action_msg *msg =
+		container_of(base, struct tx_log_action_msg, base);
+	writer->on_log_action(msg->action, &msg->vclock);
+	free(msg);
+}
+
+/* Issue notification wal closed log. */
+static inline void
+wal_notify_log_action(struct wal_writer *writer, enum wal_log_action action)
+{
+	struct tx_log_action_msg *msg = malloc(sizeof(*msg));
+	if (msg == NULL) {
+		say_error("Failed to allocate log close message");
+		return;
+	}
+	static struct cmsg_hop route[] = {
+		{ tx_on_log_action, NULL },
+	};
+	cmsg_init(&msg->base, route);
+	msg->action = action;
+	vclock_copy(&msg->vclock, &writer->vclock);
+	cpipe_push(&writer->tx_prio_pipe, &msg->base);
+}
+
 static int
 wal_open_f(struct cbus_call_msg *msg)
 {
@@ -436,7 +444,10 @@ wal_open_f(struct cbus_call_msg *msg)
 	const char *path = xdir_format_filename(&writer->wal_dir,
 				vclock_sum(&writer->vclock), NONE);
 	assert(!xlog_is_open(&writer->current_wal));
-	return xlog_open(&writer->current_wal, path, &writer->wal_dir.opts);
+	int rc = xlog_open(&writer->current_wal, path, &writer->wal_dir.opts);
+	if (rc == 0)
+		wal_notify_log_action(writer, WAL_LOG_OPEN);
+	return rc;
 }
 
 /**
@@ -500,7 +511,7 @@ wal_open(struct wal_writer *writer)
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_log_action_f on_log_action,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	assert(wal_max_rows > 1);
@@ -508,7 +519,7 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	/* Initialize the state. */
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
-			  wal_max_size, instance_uuid, on_garbage_collection,
+			  wal_max_size, instance_uuid, on_log_action,
 			  on_checkpoint_threshold);
 
 	/* Start WAL thread. */
@@ -528,12 +539,9 @@ wal_enable(void)
 
 	/* Initialize the writer vclock from the recovery state. */
 	vclock_copy(&writer->vclock, &replicaset.vclock);
+	if (vclock_sum(&replicaset.vclock) > 0)
+		vclock_copy(&writer->prev_vclock, &replicaset.vclock);
 
-	/*
-	 * Scan the WAL directory to build an index of all
-	 * existing WAL files. Required for garbage collection,
-	 * see wal_collect_garbage().
-	 */
 	if (xdir_scan(&writer->wal_dir))
 		return -1;
 
@@ -590,8 +598,8 @@ wal_begin_checkpoint_f(struct wal_msg *wal_msg)
 	if (xlog_is_open(&writer->current_wal) &&
 	    vclock_sum(&writer->current_wal.meta.vclock) !=
 	    vclock_sum(&writer->vclock)) {
-
 		xlog_close(&writer->current_wal, false);
+		wal_notify_log_action(writer, WAL_LOG_CLOSE);
 		/*
 		 * The next WAL will be created on the first write.
 		 */
@@ -662,7 +670,6 @@ wal_commit_checkpoint_f(struct cbus_call_msg *data)
 	 * when checkpointing started from the current value
 	 * rather than just setting it to 0.
 	 */
-	vclock_copy(&writer->checkpoint_vclock, &msg->vclock);
 	assert(writer->checkpoint_wal_size >= msg->wal_size);
 	writer->checkpoint_wal_size -= msg->wal_size;
 	writer->checkpoint_triggered = false;
@@ -673,10 +680,8 @@ void
 wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
-	if (writer->wal_mode == WAL_NONE) {
-		vclock_copy(&writer->checkpoint_vclock, &checkpoint->vclock);
+	if (writer->wal_mode == WAL_NONE)
 		return;
-	}
 	bool cancellable = fiber_set_cancellable(false);
 	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 		  &checkpoint->base, wal_commit_checkpoint_f, NULL,
@@ -714,54 +719,6 @@ wal_set_checkpoint_threshold(int64_t threshold)
 	fiber_set_cancellable(cancellable);
 }
 
-struct wal_gc_msg
-{
-	struct cbus_call_msg base;
-	const struct vclock *vclock;
-};
-
-static int
-wal_collect_garbage_f(struct cbus_call_msg *data)
-{
-	struct wal_writer *writer = &wal_writer_singleton;
-	const struct vclock *vclock = ((struct wal_gc_msg *)data)->vclock;
-
-	if (!xlog_is_open(&writer->current_wal) &&
-	    vclock_sum(vclock) >= vclock_sum(&writer->vclock)) {
-		/*
-		 * The last available WAL file has been sealed and
-		 * all registered consumers have done reading it.
-		 * We can delete it now.
-		 */
-	} else {
-		/*
-		 * Find the most recent WAL file that contains rows
-		 * required by registered consumers and delete all
-		 * older WAL files.
-		 */
-		vclock = vclockset_psearch(&writer->wal_dir.index, vclock);
-	}
-	if (vclock != NULL)
-		xdir_collect_garbage(&writer->wal_dir, vclock_sum(vclock),
-				     XDIR_GC_ASYNC);
-
-	return 0;
-}
-
-void
-wal_collect_garbage(const struct vclock *vclock)
-{
-	struct wal_writer *writer = &wal_writer_singleton;
-	if (writer->wal_mode == WAL_NONE)
-		return;
-	struct wal_gc_msg msg;
-	msg.vclock = vclock;
-	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg.base,
-		  wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
-}
-
 static void
 wal_notify_watchers(struct wal_writer *writer, unsigned events);
 
@@ -789,6 +746,7 @@ wal_opt_rotate(struct wal_writer *writer)
 	if (xlog_is_open(&writer->current_wal) &&
 	    (writer->current_wal.rows >= writer->wal_max_rows ||
 	     writer->current_wal.offset >= writer->wal_max_size)) {
+		wal_notify_log_action(writer, WAL_LOG_CLOSE);
 		/*
 		 * We can not handle xlog_close()
 		 * failure in any reasonable way.
@@ -801,20 +759,69 @@ wal_opt_rotate(struct wal_writer *writer)
 		return 0;
 
 	if (xdir_create_xlog(&writer->wal_dir, &writer->current_wal,
-			     &writer->vclock) != 0) {
+			     &writer->vclock, &writer->prev_vclock) != 0) {
 		diag_log();
 		return -1;
 	}
-	/*
-	 * Keep track of the new WAL vclock. Required for garbage
-	 * collection, see wal_collect_garbage().
-	 */
-	xdir_add_vclock(&writer->wal_dir, &writer->vclock);
+	vclock_copy(&writer->prev_vclock, &writer->vclock);
 
+	wal_notify_log_action(writer, WAL_LOG_OPEN);
 	wal_notify_watchers(writer, WAL_EVENT_ROTATE);
 	return 0;
 }
 
+struct gc_force_wal_msg {
+	struct cmsg base;
+	struct fiber_cond done_cond;
+	bool done;
+	int rc;
+};
+
+static void
+wal_gc_wal_force_done(struct cmsg *base)
+{
+	struct gc_force_wal_msg *msg = container_of(base,
+						   struct gc_force_wal_msg,
+						   base);
+	msg->done = true;
+	fiber_cond_signal(&msg->done_cond);
+}
+
+static int
+tx_gc_force_wal_f(va_list ap)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct gc_force_wal_msg *msg = va_arg(ap, struct gc_force_wal_msg *);
+	static struct cmsg_hop respond_route[] = {
+		{wal_gc_wal_force_done, NULL}
+	};
+	msg->rc = gc_force_wal_cleanup();
+	cmsg_init(&msg->base, respond_route);
+	cpipe_push(&writer->wal_pipe, &msg->base);
+	return 0;
+}
+
+void
+tx_gc_wal_force(struct cmsg *base)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct gc_force_wal_msg *msg = container_of(base,
+						   struct gc_force_wal_msg,
+						   base);
+	struct fiber *gc_fiber = fiber_new("wal_gc_fiber", tx_gc_force_wal_f);
+	if (gc_fiber == NULL) {
+		struct cmsg_hop respond_route[] = {
+			{wal_gc_wal_force_done, NULL}
+		};
+		msg->rc = -1;
+		cmsg_init(&msg->base, respond_route);
+		cpipe_push(&writer->wal_pipe, &msg->base);
+		return;
+	}
+	fiber_start(gc_fiber, msg);
+	return;
+}
+
 /**
  * Make sure there's enough disk space to append @len bytes
  * of data to the current WAL.
@@ -825,17 +832,10 @@ wal_opt_rotate(struct wal_writer *writer)
 static int
 wal_fallocate(struct wal_writer *writer, size_t len)
 {
-	bool warn_no_space = true, notify_gc = false;
 	struct xlog *l = &writer->current_wal;
 	struct errinj *errinj = errinj(ERRINJ_WAL_FALLOCATE, ERRINJ_INT);
 	int rc = 0;
 
-	/*
-	 * Max LSN that can be collected in case of ENOSPC -
-	 * we must not delete WALs necessary for recovery.
-	 */
-	int64_t gc_lsn = vclock_sum(&writer->checkpoint_vclock);
-
 	/*
 	 * The actual write size can be greater than the sum size
 	 * of encoded rows (compression, fixheaders). Double the
@@ -856,45 +856,23 @@ retry:
 	}
 	if (errno != ENOSPC)
 		goto error;
-	if (!xdir_has_garbage(&writer->wal_dir, gc_lsn))
-		goto error;
-
-	if (warn_no_space) {
-		say_crit("ran out of disk space, try to delete old WAL files");
-		warn_no_space = false;
-	}
-
-	xdir_collect_garbage(&writer->wal_dir, gc_lsn, XDIR_GC_REMOVE_ONE);
-	notify_gc = true;
-	goto retry;
+	static struct cmsg_hop gc_wal_force_route[] = {
+		{tx_gc_wal_force, NULL}
+	};
+	struct gc_force_wal_msg msg;
+	msg.done = false;
+	fiber_cond_create(&msg.done_cond);
+	cmsg_init(&msg.base, gc_wal_force_route);
+	cpipe_push(&writer->tx_prio_pipe, &msg.base);
+
+	while (!msg.done)
+		fiber_cond_wait(&msg.done_cond);
+	if (msg.rc == 0)
+		goto retry;
 error:
 	diag_log();
 	rc = -1;
 out:
-	/*
-	 * Notify the TX thread if the WAL thread had to delete
-	 * some WAL files to proceed so that TX can shoot off WAL
-	 * consumers that still need those files.
-	 *
-	 * We allocate the message with malloc() and we ignore
-	 * allocation failures, because this is a pretty rare
-	 * event and a failure to send this message isn't really
-	 * critical.
-	 */
-	if (notify_gc) {
-		static struct cmsg_hop route[] = {
-			{ tx_notify_gc, NULL },
-		};
-		struct tx_notify_gc_msg *msg = malloc(sizeof(*msg));
-		if (msg != NULL) {
-			if (xdir_first_vclock(&writer->wal_dir,
-					      &msg->vclock) < 0)
-				vclock_copy(&msg->vclock, &writer->vclock);
-			cmsg_init(&msg->base, route);
-			cpipe_push(&writer->tx_prio_pipe, &msg->base);
-		} else
-			say_warn("failed to allocate gc notification message");
-	}
 	return rc;
 }
 
@@ -1204,7 +1182,7 @@ wal_cord_f(va_list ap)
 			    &writer->current_wal.meta.vclock) > 0)) {
 		struct xlog l;
 		if (xdir_create_xlog(&writer->wal_dir, &l,
-				     &writer->vclock) == 0)
+				     &writer->vclock, &writer->prev_vclock) == 0)
 			xlog_close(&l, false);
 		else
 			diag_log();
diff --git a/src/box/wal.h b/src/box/wal.h
index 4e500d2a3..1a7156d97 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -52,14 +52,16 @@ extern int wal_dir_lock;
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+enum wal_log_action {
+	WAL_LOG_OPEN		= (1 << 0),
+	WAL_LOG_CLOSE		= (1 << 1)
+};
+
 /**
- * Callback invoked in the TX thread when the WAL thread runs out
- * of disk space and has to delete some old WAL files to continue.
- * It is supposed to shoot off WAL consumers that need the deleted
- * files. The vclock of the oldest WAL row still available on the
- * instance is passed in @vclock.
+ * Callback invoked in the TX thread when WAL handles a log file.
  */
-typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
+typedef void (*wal_on_log_action_f)(enum wal_log_action acion,
+				    const struct vclock *vclock);
 
 /**
  * Callback invoked in the TX thread when the total size of WAL
@@ -74,7 +76,7 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_log_action_f on_log_action,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
 /**
@@ -218,13 +220,6 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 void
 wal_set_checkpoint_threshold(int64_t threshold);
 
-/**
- * Remove WAL files that are not needed by consumers reading
- * rows at @vclock or newer.
- */
-void
-wal_collect_garbage(const struct vclock *vclock);
-
 void
 wal_init_vy_log();
 
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 8254cce20..54a7b5246 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -965,20 +965,12 @@ xdir_touch_xlog(struct xdir *dir, const struct vclock *vclock)
  */
 int
 xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
-		 const struct vclock *vclock)
+		 const struct vclock *vclock, const struct vclock *prev_vclock)
 {
 	int64_t signature = vclock_sum(vclock);
 	assert(signature >= 0);
 	assert(!tt_uuid_is_nil(dir->instance_uuid));
 
-	/*
-	 * For WAL dir: store vclock of the previous xlog file
-	 * to check for gaps on recovery.
-	 */
-	const struct vclock *prev_vclock = NULL;
-	if (dir->type == XLOG && !vclockset_empty(&dir->index))
-		prev_vclock = vclockset_last(&dir->index);
-
 	struct xlog_meta meta;
 	xlog_meta_create(&meta, dir->filetype, dir->instance_uuid,
 			 vclock, prev_vclock);
diff --git a/src/box/xlog.h b/src/box/xlog.h
index a48b05fc4..6a71e0cbd 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -424,7 +424,7 @@ xdir_touch_xlog(struct xdir *dir, const struct vclock *vclock);
  */
 int
 xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
-		 const struct vclock *vclock);
+		 const struct vclock *vclock, const struct vclock *prev_vclock);
 
 /**
  * Create new xlog writer based on fd.
diff --git a/test/replication/gc_no_space.result b/test/replication/gc_no_space.result
index e860ab00f..bcf175282 100644
--- a/test/replication/gc_no_space.result
+++ b/test/replication/gc_no_space.result
@@ -250,7 +250,7 @@ gc = box.info.gc()
 ---
 - 2
 ...
-gc.signature == gc.checkpoints[2].signature
+gc.signature == gc.checkpoints[1].signature
 ---
 - true
 ...
@@ -272,7 +272,7 @@ gc = box.info.gc()
 ---
 - 2
 ...
-gc.signature == gc.checkpoints[2].signature
+gc.signature == gc.checkpoints[1].signature
 ---
 - true
 ...
diff --git a/test/replication/gc_no_space.test.lua b/test/replication/gc_no_space.test.lua
index 98ccd401b..bf4049fbb 100644
--- a/test/replication/gc_no_space.test.lua
+++ b/test/replication/gc_no_space.test.lua
@@ -106,7 +106,7 @@ check_snap_count(2)
 gc = box.info.gc()
 #gc.consumers -- 0
 #gc.checkpoints -- 2
-gc.signature == gc.checkpoints[2].signature
+gc.signature == gc.checkpoints[1].signature
 
 s:drop()
 box.schema.user.revoke('guest', 'replication')
@@ -116,4 +116,4 @@ test_run:cleanup_cluster()
 test_run:cmd("restart server default")
 gc = box.info.gc()
 #gc.checkpoints -- 2
-gc.signature == gc.checkpoints[2].signature
+gc.signature == gc.checkpoints[1].signature
-- 
2.22.0





More information about the Tarantool-patches mailing list