[PATCH v2 01/10] gc: do not use WAL watcher API for deactivating stale consumers

Vladimir Davydov vdavydov.dev at gmail.com
Sat Dec 8 18:48:05 MSK 2018


The WAL thread may delete old WAL files if it gets ENOSPC error.
Currently, we use WAL watcher API to notify the TX thread about it so
that it can shoot off stale replicas. This looks ugly, because WAL
watcher API was initially designed to propagate WAL changes to relay
threads and the new event WAL_EVENT_GC, which was introduced for
notifying about ENOSPC-driven garbage collection, isn't used anywhere
else. Besides, there's already a pipe from WAL to TX - we could reuse it
instead of opening another one.

If we followed down that path, then in order to trigger a checkpoint
from the WAL thread (see #1082), we would have to introduce yet another
esoteric WAL watcher event, making the whole design look even uglier.
That said, let's rewrite the garbage collection notification procedure
using a plane callback instead of abusing WAL watcher API.
---
 src/box/box.cc |  9 +++++--
 src/box/gc.c   | 33 ++++---------------------
 src/box/gc.h   | 19 ++++++---------
 src/box/wal.c  | 76 ++++++++++++++++++++++++++++++++++++++++++++++++----------
 src/box/wal.h  | 19 ++++++++-------
 5 files changed, 92 insertions(+), 64 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index bb7c1bb9..20412af4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2011,6 +2011,12 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
 	cbus_process(endpoint);
 }
 
+static void
+on_wal_garbage_collection(const struct vclock *vclock)
+{
+	gc_advance(vclock);
+}
+
 void
 box_init(void)
 {
@@ -2125,10 +2131,9 @@ box_cfg_xc(void)
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
 		     wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
-		     &checkpoint->vclock) != 0) {
+		     &checkpoint->vclock, on_wal_garbage_collection) != 0) {
 		diag_raise();
 	}
-	gc_set_wal_watcher();
 
 	rmean_cleanup(rmean_box);
 
diff --git a/src/box/gc.c b/src/box/gc.c
index 9c049977..87273b8d 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -113,26 +113,6 @@ gc_init(void)
 	fiber_start(gc.fiber);
 }
 
-static void
-gc_process_wal_event(struct wal_watcher_msg *);
-
-void
-gc_set_wal_watcher(void)
-{
-	/*
-	 * Since the function is called from box_cfg() it is
-	 * important that we do not pass a message processing
-	 * callback to wal_set_watcher(). Doing so would cause
-	 * credentials corruption in the fiber executing
-	 * box_cfg() in case it processes some iproto messages.
-	 * Besides, by the time the function is called
-	 * tx_fiber_pool is already set up and it will process
-	 * all the messages directed to "tx" endpoint safely.
-	 */
-	wal_set_watcher(&gc.wal_watcher, "tx", gc_process_wal_event,
-			NULL, WAL_EVENT_GC);
-}
-
 void
 gc_free(void)
 {
@@ -270,25 +250,20 @@ gc_wait(void)
 		fiber_cond_wait(&gc.cond);
 }
 
-/**
- * Deactivate consumers that need files deleted by the WAL thread.
- */
-static void
-gc_process_wal_event(struct wal_watcher_msg *msg)
+void
+gc_advance(const struct vclock *vclock)
 {
-	assert((msg->events & WAL_EVENT_GC) != 0);
-
 	/*
 	 * In case of emergency ENOSPC, the WAL thread may delete
 	 * WAL files needed to restore from backup checkpoints,
 	 * which would be kept by the garbage collector otherwise.
 	 * Bring the garbage collector vclock up to date.
 	 */
-	vclock_copy(&gc.vclock, &msg->gc_vclock);
+	vclock_copy(&gc.vclock, vclock);
 
 	struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
 	while (consumer != NULL &&
-	       vclock_sum(&consumer->vclock) < vclock_sum(&msg->gc_vclock)) {
+	       vclock_sum(&consumer->vclock) < vclock_sum(vclock)) {
 		struct gc_consumer *next = gc_tree_next(&gc.consumers,
 							consumer);
 		assert(!consumer->is_inactive);
diff --git a/src/box/gc.h b/src/box/gc.h
index 6e96d7bb..a141ace6 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -36,7 +36,6 @@
 
 #include "fiber_cond.h"
 #include "vclock.h"
-#include "wal.h"
 #include "trivia/util.h"
 
 #if defined(__cplusplus)
@@ -122,11 +121,6 @@ struct gc_state {
 	struct rlist checkpoints;
 	/** Registered consumers, linked by gc_consumer::node. */
 	gc_tree_t consumers;
-	/**
-	 * WAL event watcher. Needed to shoot off stale consumers
-	 * when a WAL file is deleted due to ENOSPC.
-	 */
-	struct wal_watcher wal_watcher;
 	/** Fiber that removes old files in the background. */
 	struct fiber *fiber;
 	/**
@@ -192,12 +186,6 @@ void
 gc_init(void);
 
 /**
- * Set WAL watcher. Called after WAL is initialized.
- */
-void
-gc_set_wal_watcher(void);
-
-/**
  * Destroy the garbage collection state.
  */
 void
@@ -211,6 +199,13 @@ void
 gc_wait(void);
 
 /**
+ * Advance the garbage collector vclock to the given position.
+ * Deactivate WAL consumers that need older data.
+ */
+void
+gc_advance(const struct vclock *vclock);
+
+/**
  * Update the minimal number of checkpoints to preserve.
  * Called when box.cfg.checkpoint_count is updated.
  *
diff --git a/src/box/wal.c b/src/box/wal.c
index 3b5b9492..0775dbae 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -91,6 +91,7 @@ struct wal_writer
 {
 	struct journal base;
 	/* ----------------- tx ------------------- */
+	wal_on_garbage_collection_f on_garbage_collection;
 	/**
 	 * The rollback queue. An accumulator for all requests
 	 * that need to be rolled back. Also acts as a valve
@@ -254,6 +255,7 @@ tx_schedule_queue(struct stailq *queue)
 static void
 tx_schedule_commit(struct cmsg *msg)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	struct wal_msg *batch = (struct wal_msg *) msg;
 	/*
 	 * Move the rollback list to the writer first, since
@@ -261,7 +263,6 @@ tx_schedule_commit(struct cmsg *msg)
 	 * iteration of tx_schedule_queue loop.
 	 */
 	if (! stailq_empty(&batch->rollback)) {
-		struct wal_writer *writer = &wal_writer_singleton;
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
 	}
@@ -286,6 +287,28 @@ tx_schedule_rollback(struct cmsg *msg)
 	stailq_create(&writer->rollback);
 }
 
+
+/**
+ * This message is sent from WAL to TX when the WAL thread hits
+ * ENOSPC and has to delete some backup WAL files to continue.
+ * The TX thread uses this message to shoot off WAL consumers
+ * that needed deleted WAL files.
+ */
+struct tx_notify_gc_msg {
+	struct cmsg base;
+	/** VClock of the oldest WAL row preserved by WAL. */
+	struct vclock vclock;
+};
+
+static void
+tx_notify_gc(struct cmsg *msg)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct vclock *vclock = &((struct tx_notify_gc_msg *)msg)->vclock;
+	writer->on_garbage_collection(vclock);
+	free(msg);
+}
+
 /**
  * Initialize WAL writer context. Even though it's a singleton,
  * encapsulate the details just in case we may use
@@ -296,7 +319,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 		  const char *wal_dirname, int64_t wal_max_rows,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 		  const struct vclock *vclock,
-		  const struct vclock *checkpoint_vclock)
+		  const struct vclock *checkpoint_vclock,
+		  wal_on_garbage_collection_f on_garbage_collection)
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_rows = wal_max_rows;
@@ -315,6 +339,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	vclock_copy(&writer->vclock, vclock);
 	vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
 	rlist_create(&writer->watchers);
+
+	writer->on_garbage_collection = on_garbage_collection;
 }
 
 /** Destroy a WAL writer structure. */
@@ -419,14 +445,15 @@ wal_open(struct wal_writer *writer)
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *checkpoint_vclock)
+	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
+	 wal_on_garbage_collection_f on_garbage_collection)
 {
 	assert(wal_max_rows > 1);
 
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
 			  wal_max_size, instance_uuid, vclock,
-			  checkpoint_vclock);
+			  checkpoint_vclock, on_garbage_collection);
 
 	/*
 	 * Scan the WAL directory to build an index of all
@@ -673,9 +700,10 @@ wal_opt_rotate(struct wal_writer *writer)
 static int
 wal_fallocate(struct wal_writer *writer, size_t len)
 {
-	bool warn_no_space = true;
+	bool warn_no_space = true, notify_gc = false;
 	struct xlog *l = &writer->current_wal;
 	struct errinj *errinj = errinj(ERRINJ_WAL_FALLOCATE, ERRINJ_INT);
+	int rc = 0;
 
 	/*
 	 * Max LSN that can be collected in case of ENOSPC -
@@ -693,9 +721,9 @@ wal_fallocate(struct wal_writer *writer, size_t len)
 retry:
 	if (errinj == NULL || errinj->iparam == 0) {
 		if (l->allocated >= len)
-			return 0;
+			goto out;
 		if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
-			return 0;
+			goto out;
 	} else {
 		errinj->iparam--;
 		diag_set(ClientError, ER_INJECTION, "xlog fallocate");
@@ -722,11 +750,37 @@ retry:
 	}
 	diag_destroy(&diag);
 
-	wal_notify_watchers(writer, WAL_EVENT_GC);
+	notify_gc = true;
 	goto retry;
 error:
 	diag_log();
-	return -1;
+	rc = -1;
+out:
+	/*
+	 * Notify the TX thread if the WAL thread had to delete
+	 * some WAL files to proceed so that TX can shoot off WAL
+	 * consumers that still need those files.
+	 *
+	 * We allocate the message with malloc() and we ignore
+	 * allocation failures, because this is a pretty rare
+	 * event and a failure to send this message isn't really
+	 * critical.
+	 */
+	if (notify_gc) {
+		static struct cmsg_hop route[] = {
+			{ tx_notify_gc, NULL },
+		};
+		struct tx_notify_gc_msg *msg = malloc(sizeof(*msg));
+		if (msg != NULL) {
+			if (xdir_first_vclock(&writer->wal_dir,
+					      &msg->vclock) < 0)
+				vclock_copy(&msg->vclock, &writer->vclock);
+			cmsg_init(&msg->base, route);
+			cpipe_push(&wal_thread.tx_prio_pipe, &msg->base);
+		} else
+			say_warn("failed to allocate gc notification message");
+	}
+	return rc;
 }
 
 static void
@@ -1115,7 +1169,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	assert(!rlist_empty(&watcher->next));
 
 	struct wal_watcher_msg *msg = &watcher->msg;
-	struct wal_writer *writer = &wal_writer_singleton;
 
 	events &= watcher->event_mask;
 	if (events == 0) {
@@ -1134,9 +1187,6 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	}
 
 	msg->events = events;
-	if (xdir_first_vclock(&writer->wal_dir, &msg->gc_vclock) < 0)
-		vclock_copy(&msg->gc_vclock, &writer->vclock);
-
 	cmsg_init(&msg->cmsg, watcher->route);
 	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
 }
diff --git a/src/box/wal.h b/src/box/wal.h
index 3c9eb42f..6e5a5458 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -52,13 +52,23 @@ extern int wal_dir_lock;
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+/**
+ * Callback invoked in the TX thread when the WAL thread runs out
+ * of disk space and has to delete some old WAL files to continue.
+ * It is supposed to shoot off WAL consumers that need the deleted
+ * files. The vclock of the oldest WAL row still available on the
+ * instance is passed in @vclock.
+ */
+typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
+
 void
 wal_thread_start();
 
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *checkpoint_vclock);
+	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
+	 wal_on_garbage_collection_f on_garbage_collection);
 
 void
 wal_thread_stop();
@@ -73,8 +83,6 @@ struct wal_watcher_msg {
 	struct wal_watcher *watcher;
 	/** Bit mask of events, see wal_event. */
 	unsigned events;
-	/** VClock of the oldest stored WAL row. */
-	struct vclock gc_vclock;
 };
 
 enum wal_event {
@@ -82,11 +90,6 @@ enum wal_event {
 	WAL_EVENT_WRITE		= (1 << 0),
 	/** A new WAL is created. */
 	WAL_EVENT_ROTATE	= (1 << 1),
-	/**
-	 * The WAL thread ran out of disk space and had to delete
-	 * one or more old WAL files.
-	 **/
-	WAL_EVENT_GC		= (1 << 2),
 };
 
 struct wal_watcher {
-- 
2.11.0




More information about the Tarantool-patches mailing list