[PATCH 5/9] gc: do not use WAL watcher API for deactivating stale consumers

Vladimir Davydov vdavydov.dev at gmail.com
Wed Nov 28 19:14:43 MSK 2018


The WAL thread may delete old WAL files if it gets ENOSPC error.
Currently, we use WAL watcher API to notify the TX thread about it so
that it can shoot off stale replicas. This looks ugly, because WAL
watcher API was initially designed to propagate WAL changes to relay
threads and the new event WAL_EVENT_GC, which was introduced for
notifying about ENOSPC-driven garbage collection, isn't used anywhere
else. Besides, there's already a pipe from WAL to TX - we could reuse it
instead of opening another one.

If we followed down that path, then in order to trigger a checkpoint
from the WAL thread (see #1082), we would have to introduce yet another
esoteric WAL watcher event, making the whole design look even uglier.
That said, let's rewrite the garbage collection notification procedure
using a plane callback instead of abusing WAL watcher API.
---
 src/box/box.cc |  9 +++++++--
 src/box/gc.c   | 33 ++++-----------------------------
 src/box/gc.h   | 13 +++++++------
 src/box/wal.c  | 46 ++++++++++++++++++++++++++++++++--------------
 src/box/wal.h  | 19 +++++++++++--------
 5 files changed, 61 insertions(+), 59 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index bb7c1bb9..20412af4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2011,6 +2011,12 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
 	cbus_process(endpoint);
 }
 
+static void
+on_wal_garbage_collection(const struct vclock *vclock)
+{
+	gc_advance(vclock);
+}
+
 void
 box_init(void)
 {
@@ -2125,10 +2131,9 @@ box_cfg_xc(void)
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
 		     wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
-		     &checkpoint->vclock) != 0) {
+		     &checkpoint->vclock, on_wal_garbage_collection) != 0) {
 		diag_raise();
 	}
-	gc_set_wal_watcher();
 
 	rmean_cleanup(rmean_box);
 
diff --git a/src/box/gc.c b/src/box/gc.c
index 9c049977..87273b8d 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -113,26 +113,6 @@ gc_init(void)
 	fiber_start(gc.fiber);
 }
 
-static void
-gc_process_wal_event(struct wal_watcher_msg *);
-
-void
-gc_set_wal_watcher(void)
-{
-	/*
-	 * Since the function is called from box_cfg() it is
-	 * important that we do not pass a message processing
-	 * callback to wal_set_watcher(). Doing so would cause
-	 * credentials corruption in the fiber executing
-	 * box_cfg() in case it processes some iproto messages.
-	 * Besides, by the time the function is called
-	 * tx_fiber_pool is already set up and it will process
-	 * all the messages directed to "tx" endpoint safely.
-	 */
-	wal_set_watcher(&gc.wal_watcher, "tx", gc_process_wal_event,
-			NULL, WAL_EVENT_GC);
-}
-
 void
 gc_free(void)
 {
@@ -270,25 +250,20 @@ gc_wait(void)
 		fiber_cond_wait(&gc.cond);
 }
 
-/**
- * Deactivate consumers that need files deleted by the WAL thread.
- */
-static void
-gc_process_wal_event(struct wal_watcher_msg *msg)
+void
+gc_advance(const struct vclock *vclock)
 {
-	assert((msg->events & WAL_EVENT_GC) != 0);
-
 	/*
 	 * In case of emergency ENOSPC, the WAL thread may delete
 	 * WAL files needed to restore from backup checkpoints,
 	 * which would be kept by the garbage collector otherwise.
 	 * Bring the garbage collector vclock up to date.
 	 */
-	vclock_copy(&gc.vclock, &msg->gc_vclock);
+	vclock_copy(&gc.vclock, vclock);
 
 	struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
 	while (consumer != NULL &&
-	       vclock_sum(&consumer->vclock) < vclock_sum(&msg->gc_vclock)) {
+	       vclock_sum(&consumer->vclock) < vclock_sum(vclock)) {
 		struct gc_consumer *next = gc_tree_next(&gc.consumers,
 							consumer);
 		assert(!consumer->is_inactive);
diff --git a/src/box/gc.h b/src/box/gc.h
index 6e96d7bb..bcf7d212 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -192,12 +192,6 @@ void
 gc_init(void);
 
 /**
- * Set WAL watcher. Called after WAL is initialized.
- */
-void
-gc_set_wal_watcher(void);
-
-/**
  * Destroy the garbage collection state.
  */
 void
@@ -211,6 +205,13 @@ void
 gc_wait(void);
 
 /**
+ * Advance the garbage collector vclock to the given position.
+ * Deactivate WAL consumers that need older data.
+ */
+void
+gc_advance(const struct vclock *vclock);
+
+/**
  * Update the minimal number of checkpoints to preserve.
  * Called when box.cfg.checkpoint_count is updated.
  *
diff --git a/src/box/wal.c b/src/box/wal.c
index 3a73c5c5..c47535e4 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -91,6 +91,7 @@ struct wal_writer
 {
 	struct journal base;
 	/* ----------------- tx ------------------- */
+	wal_on_garbage_collection_f on_garbage_collection;
 	/**
 	 * The rollback queue. An accumulator for all requests
 	 * that need to be rolled back. Also acts as a valve
@@ -152,6 +153,17 @@ struct wal_msg {
 	 * be rolled back.
 	 */
 	struct stailq rollback;
+	/**
+	 * Set if the WAL thread ran out of disk space while
+	 * processing this request and had to delete some old
+	 * WAL files.
+	 */
+	bool gc_executed;
+	/**
+	 * VClock of the oldest WAL row available on the instance.
+	 * Initialized only if @gc_executed is set.
+	 */
+	struct vclock gc_vclock;
 };
 
 /**
@@ -188,6 +200,7 @@ wal_msg_create(struct wal_msg *batch)
 {
 	cmsg_init(&batch->base, wal_request_route);
 	batch->approx_len = 0;
+	batch->gc_executed = false;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
 }
@@ -254,6 +267,7 @@ tx_schedule_queue(struct stailq *queue)
 static void
 tx_schedule_commit(struct cmsg *msg)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	struct wal_msg *batch = (struct wal_msg *) msg;
 	/*
 	 * Move the rollback list to the writer first, since
@@ -261,11 +275,13 @@ tx_schedule_commit(struct cmsg *msg)
 	 * iteration of tx_schedule_queue loop.
 	 */
 	if (! stailq_empty(&batch->rollback)) {
-		struct wal_writer *writer = &wal_writer_singleton;
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
 	}
 	tx_schedule_queue(&batch->commit);
+
+	if (batch->gc_executed)
+		writer->on_garbage_collection(&batch->gc_vclock);
 }
 
 static void
@@ -296,7 +312,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 		  const char *wal_dirname, int64_t wal_max_rows,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 		  const struct vclock *vclock,
-		  const struct vclock *checkpoint_vclock)
+		  const struct vclock *checkpoint_vclock,
+		  wal_on_garbage_collection_f on_garbage_collection)
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_rows = wal_max_rows;
@@ -315,6 +332,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	vclock_copy(&writer->vclock, vclock);
 	vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
 	rlist_create(&writer->watchers);
+
+	writer->on_garbage_collection = on_garbage_collection;
 }
 
 /** Destroy a WAL writer structure. */
@@ -419,14 +438,15 @@ wal_open(struct wal_writer *writer)
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *checkpoint_vclock)
+	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
+	 wal_on_garbage_collection_f on_garbage_collection)
 {
 	assert(wal_max_rows > 1);
 
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
 			  wal_max_size, instance_uuid, vclock,
-			  checkpoint_vclock);
+			  checkpoint_vclock, on_garbage_collection);
 
 	/*
 	 * Scan the WAL directory to build an index of all
@@ -664,14 +684,14 @@ wal_opt_rotate(struct wal_writer *writer)
 }
 
 /**
- * Make sure there's enough disk space to append @len bytes
- * of data to the current WAL.
+ * Make sure there's enough disk space to process the given
+ * WAL message.
  *
  * If fallocate() fails with ENOSPC, delete old WAL files
  * that are not needed for recovery and retry.
  */
 static int
-wal_fallocate(struct wal_writer *writer, size_t len)
+wal_fallocate(struct wal_writer *writer, struct wal_msg *msg)
 {
 	bool warn_no_space = true;
 	struct xlog *l = &writer->current_wal;
@@ -688,7 +708,7 @@ wal_fallocate(struct wal_writer *writer, size_t len)
 	 * of encoded rows (compression, fixheaders). Double the
 	 * given length to get a rough upper bound estimate.
 	 */
-	len *= 2;
+	size_t len = msg->approx_len * 2;
 
 retry:
 	if (errinj == NULL || errinj->iparam == 0) {
@@ -722,7 +742,9 @@ retry:
 	}
 	diag_destroy(&diag);
 
-	wal_notify_watchers(writer, WAL_EVENT_GC);
+	msg->gc_executed = true;
+	if (xdir_first_vclock(&writer->wal_dir, &msg->gc_vclock) < 0)
+		vclock_copy(&msg->gc_vclock, &writer->vclock);
 	goto retry;
 error:
 	diag_log();
@@ -816,7 +838,7 @@ wal_write_to_disk(struct cmsg *msg)
 	}
 
 	/* Ensure there's enough disk space before writing anything. */
-	if (wal_fallocate(writer, wal_msg->approx_len) != 0) {
+	if (wal_fallocate(writer, wal_msg) != 0) {
 		stailq_concat(&wal_msg->rollback, &wal_msg->commit);
 		return wal_writer_begin_rollback(writer);
 	}
@@ -1115,7 +1137,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	assert(!rlist_empty(&watcher->next));
 
 	struct wal_watcher_msg *msg = &watcher->msg;
-	struct wal_writer *writer = &wal_writer_singleton;
 
 	events &= watcher->event_mask;
 	if (events == 0) {
@@ -1134,9 +1155,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	}
 
 	msg->events = events;
-	if (xdir_first_vclock(&writer->wal_dir, &msg->gc_vclock) < 0)
-		vclock_copy(&msg->gc_vclock, &writer->vclock);
-
 	cmsg_init(&msg->cmsg, watcher->route);
 	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
 }
diff --git a/src/box/wal.h b/src/box/wal.h
index 5f3a66ce..e5079552 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -52,13 +52,23 @@ extern int wal_dir_lock;
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+/**
+ * Callback invoked on behalf of the tx thread upon request
+ * completion if the WAL thread ran out of disk space while
+ * performing a request and had to delete some old WAL files
+ * in order to continue. The vclock of the oldest WAL row
+ * still stored on the instance is returned in @vclock.
+ */
+typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
+
 void
 wal_thread_start();
 
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *checkpoint_vclock);
+	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
+	 wal_on_garbage_collection_f on_garbage_collection);
 
 void
 wal_thread_stop();
@@ -73,8 +83,6 @@ struct wal_watcher_msg {
 	struct wal_watcher *watcher;
 	/** Bit mask of events, see wal_event. */
 	unsigned events;
-	/** VClock of the oldest stored WAL row. */
-	struct vclock gc_vclock;
 };
 
 enum wal_event {
@@ -82,11 +90,6 @@ enum wal_event {
 	WAL_EVENT_WRITE		= (1 << 0),
 	/** A new WAL is created. */
 	WAL_EVENT_ROTATE	= (1 << 1),
-	/**
-	 * The WAL thread ran out of disk space and had to delete
-	 * one or more old WAL files.
-	 **/
-	WAL_EVENT_GC		= (1 << 2),
 };
 
 struct wal_watcher {
-- 
2.11.0




More information about the Tarantool-patches mailing list