[PATCH 4/5] wal: notify watchers about wal file removal

Vladimir Davydov vdavydov.dev at gmail.com
Sun Oct 7 23:27:17 MSK 2018


We will use this event to kill consumers when the WAL thread removes
a WAL file on ENOSPC error.

Needed for #3397
---
 src/box/relay.cc |  8 ++++++--
 src/box/wal.c    | 31 ++++++++++++++++++-------------
 src/box/wal.h    | 20 +++++++++++++++-----
 src/box/xlog.h   | 15 +++++++++++++++
 4 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index d5df487e..8f1ba6ac 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -396,9 +396,10 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
 }
 
 static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
+relay_process_wal_event(struct wal_watcher_msg *msg)
 {
-	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
+	struct relay *relay = container_of(msg->watcher, struct relay,
+					   wal_watcher);
 	if (relay->state != RELAY_FOLLOW) {
 		/*
 		 * Do not try to send anything to the replica
@@ -406,6 +407,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 		 */
 		return;
 	}
+	unsigned events = msg->events;
+	if ((events & (WAL_EVENT_WRITE | WAL_EVENT_ROTATE)) == 0)
+		return;
 	try {
 		recover_remaining_wals(relay->r, &relay->stream, NULL,
 				       (events & WAL_EVENT_ROTATE) != 0);
diff --git a/src/box/wal.c b/src/box/wal.c
index 2728318a..20b85f43 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -55,6 +55,9 @@ wal_write(struct journal *, struct journal_entry *);
 static int64_t
 wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
 
+static void
+wal_notify_watchers(struct wal_writer *writer, unsigned events);
+
 /* WAL thread. */
 struct wal_thread {
 	/** 'wal' thread doing the writes. */
@@ -537,8 +540,10 @@ struct wal_gc_msg
 static int
 wal_collect_garbage_f(struct cbus_call_msg *data)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	int64_t lsn = ((struct wal_gc_msg *)data)->lsn;
-	xdir_collect_garbage(&wal_writer_singleton.wal_dir, lsn, -1, false);
+	xdir_collect_garbage(&writer->wal_dir, lsn, -1, false);
+	wal_notify_watchers(writer, WAL_EVENT_GC);
 	return 0;
 }
 
@@ -556,9 +561,6 @@ wal_collect_garbage(int64_t lsn)
 	fiber_set_cancellable(cancellable);
 }
 
-static void
-wal_notify_watchers(struct wal_writer *writer, unsigned events);
-
 /**
  * If there is no current WAL, try to open it, and close the
  * previous WAL. We close the previous WAL only after opening
@@ -1008,7 +1010,10 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 {
 	assert(!rlist_empty(&watcher->next));
 
-	if (watcher->msg.cmsg.route != NULL) {
+	struct wal_watcher_msg *msg = &watcher->msg;
+	struct wal_writer *writer = &wal_writer_singleton;
+
+	if (msg->cmsg.route != NULL) {
 		/*
 		 * If the notification message is still en route,
 		 * mark the watcher to resend it as soon as it
@@ -1018,19 +1023,19 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 		return;
 	}
 
-	watcher->msg.events = events;
-	cmsg_init(&watcher->msg.cmsg, watcher->route);
-	cpipe_push(&watcher->watcher_pipe, &watcher->msg.cmsg);
+	msg->events = events;
+	msg->gc_lsn = xdir_first_vclock(&writer->wal_dir, NULL);
+	if (msg->gc_lsn < 0)
+		msg->gc_lsn = vclock_sum(&writer->vclock);
+	cmsg_init(&msg->cmsg, watcher->route);
+	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
 }
 
 static void
 wal_watcher_notify_perform(struct cmsg *cmsg)
 {
 	struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg;
-	struct wal_watcher *watcher = msg->watcher;
-	unsigned events = msg->events;
-
-	watcher->cb(watcher, events);
+	msg->watcher->cb(msg);
 }
 
 static void
@@ -1083,7 +1088,7 @@ wal_watcher_detach(void *arg)
 
 void
 wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher *, unsigned events),
+		void (*watcher_cb)(struct wal_watcher_msg *),
 		void (*process_cb)(struct cbus_endpoint *))
 {
 	assert(journal_is_initialized(&wal_writer_singleton.base));
diff --git a/src/box/wal.h b/src/box/wal.h
index 8ef1fb1d..4867ec3b 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -63,10 +63,18 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname,
 void
 wal_thread_stop();
 
+/**
+ * A notification message sent from the WAL to a watcher
+ * when a WAL event occurs.
+ */
 struct wal_watcher_msg {
 	struct cmsg cmsg;
+	/** Pointer to the watcher this message is for. */
 	struct wal_watcher *watcher;
+	/** Bit mask of events, see wal_event. */
 	unsigned events;
+	/** Signature of the oldest stored WAL row. */
+	int64_t gc_lsn;
 };
 
 enum wal_event {
@@ -74,13 +82,15 @@ enum wal_event {
 	WAL_EVENT_WRITE		= (1 << 0),
 	/** A new WAL is created. */
 	WAL_EVENT_ROTATE	= (1 << 1),
+	/** One or more old WALs have been deleted. */
+	WAL_EVENT_GC		= (1 << 2),
 };
 
 struct wal_watcher {
 	/** Link in wal_writer::watchers. */
 	struct rlist next;
 	/** The watcher callback function. */
-	void (*cb)(struct wal_watcher *, unsigned events);
+	void (*cb)(struct wal_watcher_msg *);
 	/** Pipe from the watcher to WAL. */
 	struct cpipe wal_pipe;
 	/** Pipe from WAL to the watcher. */
@@ -114,16 +124,16 @@ struct wal_watcher {
  * @param watcher     WAL watcher to register.
  * @param name        Name of the cbus endpoint at the caller's cord.
  * @param watcher_cb  Callback to invoke from the caller's cord
- *                    upon receiving a WAL event. Apart from the
- *                    watcher itself, it takes a bit mask of events.
- *                    Events are described in wal_event enum.
+ *                    upon receiving a WAL event. It takes an object
+ *                    of type wal_watcher_msg that stores a pointer
+ *                    to the watcher and information about the event.
  * @param process_cb  Function called to process cbus messages
  *                    while the watcher is being attached or NULL
  *                    if the cbus loop is running elsewhere.
  */
 void
 wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher *, unsigned events),
+		void (*watcher_cb)(struct wal_watcher_msg *),
 		void (*process_cb)(struct cbus_endpoint *));
 
 /**
diff --git a/src/box/xlog.h b/src/box/xlog.h
index 2233c022..75bc610b 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -198,6 +198,21 @@ void
 xdir_collect_inprogress(struct xdir *xdir);
 
 /**
+ * Return LSN and vclock (unless @vclock is NULL) of the oldest
+ * file in a directory or -1 if the directory is empty.
+ */
+static inline int64_t
+xdir_first_vclock(struct xdir *xdir, struct vclock *vclock)
+{
+	struct vclock *first = vclockset_first(&xdir->index);
+	if (first == NULL)
+		return -1;
+	if (vclock != NULL)
+		vclock_copy(vclock, first);
+	return vclock_sum(first);
+}
+
+/**
  * Return LSN and vclock (unless @vclock is NULL) of the newest
  * file in a directory or -1 if the directory is empty.
  */
-- 
2.11.0




More information about the Tarantool-patches mailing list