Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov <vdavydov.dev@gmail.com>
To: kostja@tarantool.org
Cc: tarantool-patches@freelists.org
Subject: [PATCH 4/5] wal: notify watchers about wal file removal
Date: Sun,  7 Oct 2018 23:27:17 +0300	[thread overview]
Message-ID: <ae88c694a30c04a01b8f8c762b9106a033687b57.1538942600.git.vdavydov.dev@gmail.com> (raw)
In-Reply-To: <cover.1538942600.git.vdavydov.dev@gmail.com>
In-Reply-To: <cover.1538942600.git.vdavydov.dev@gmail.com>

We will use this event to kill consumers when the WAL thread removes
a WAL file on ENOSPC error.

Needed for #3397
---
 src/box/relay.cc |  8 ++++++--
 src/box/wal.c    | 31 ++++++++++++++++++-------------
 src/box/wal.h    | 20 +++++++++++++++-----
 src/box/xlog.h   | 15 +++++++++++++++
 4 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index d5df487e..8f1ba6ac 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -396,9 +396,10 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
 }
 
 static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
+relay_process_wal_event(struct wal_watcher_msg *msg)
 {
-	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
+	struct relay *relay = container_of(msg->watcher, struct relay,
+					   wal_watcher);
 	if (relay->state != RELAY_FOLLOW) {
 		/*
 		 * Do not try to send anything to the replica
@@ -406,6 +407,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 		 */
 		return;
 	}
+	unsigned events = msg->events;
+	if ((events & (WAL_EVENT_WRITE | WAL_EVENT_ROTATE)) == 0)
+		return;
 	try {
 		recover_remaining_wals(relay->r, &relay->stream, NULL,
 				       (events & WAL_EVENT_ROTATE) != 0);
diff --git a/src/box/wal.c b/src/box/wal.c
index 2728318a..20b85f43 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -55,6 +55,9 @@ wal_write(struct journal *, struct journal_entry *);
 static int64_t
 wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
 
+static void
+wal_notify_watchers(struct wal_writer *writer, unsigned events);
+
 /* WAL thread. */
 struct wal_thread {
 	/** 'wal' thread doing the writes. */
@@ -537,8 +540,10 @@ struct wal_gc_msg
 static int
 wal_collect_garbage_f(struct cbus_call_msg *data)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	int64_t lsn = ((struct wal_gc_msg *)data)->lsn;
-	xdir_collect_garbage(&wal_writer_singleton.wal_dir, lsn, -1, false);
+	xdir_collect_garbage(&writer->wal_dir, lsn, -1, false);
+	wal_notify_watchers(writer, WAL_EVENT_GC);
 	return 0;
 }
 
@@ -556,9 +561,6 @@ wal_collect_garbage(int64_t lsn)
 	fiber_set_cancellable(cancellable);
 }
 
-static void
-wal_notify_watchers(struct wal_writer *writer, unsigned events);
-
 /**
  * If there is no current WAL, try to open it, and close the
  * previous WAL. We close the previous WAL only after opening
@@ -1008,7 +1010,10 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 {
 	assert(!rlist_empty(&watcher->next));
 
-	if (watcher->msg.cmsg.route != NULL) {
+	struct wal_watcher_msg *msg = &watcher->msg;
+	struct wal_writer *writer = &wal_writer_singleton;
+
+	if (msg->cmsg.route != NULL) {
 		/*
 		 * If the notification message is still en route,
 		 * mark the watcher to resend it as soon as it
@@ -1018,19 +1023,19 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 		return;
 	}
 
-	watcher->msg.events = events;
-	cmsg_init(&watcher->msg.cmsg, watcher->route);
-	cpipe_push(&watcher->watcher_pipe, &watcher->msg.cmsg);
+	msg->events = events;
+	msg->gc_lsn = xdir_first_vclock(&writer->wal_dir, NULL);
+	if (msg->gc_lsn < 0)
+		msg->gc_lsn = vclock_sum(&writer->vclock);
+	cmsg_init(&msg->cmsg, watcher->route);
+	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
 }
 
 static void
 wal_watcher_notify_perform(struct cmsg *cmsg)
 {
 	struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg;
-	struct wal_watcher *watcher = msg->watcher;
-	unsigned events = msg->events;
-
-	watcher->cb(watcher, events);
+	msg->watcher->cb(msg);
 }
 
 static void
@@ -1083,7 +1088,7 @@ wal_watcher_detach(void *arg)
 
 void
 wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher *, unsigned events),
+		void (*watcher_cb)(struct wal_watcher_msg *),
 		void (*process_cb)(struct cbus_endpoint *))
 {
 	assert(journal_is_initialized(&wal_writer_singleton.base));
diff --git a/src/box/wal.h b/src/box/wal.h
index 8ef1fb1d..4867ec3b 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -63,10 +63,18 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname,
 void
 wal_thread_stop();
 
+/**
+ * A notification message sent from the WAL to a watcher
+ * when a WAL event occurs.
+ */
 struct wal_watcher_msg {
 	struct cmsg cmsg;
+	/** Pointer to the watcher this message is for. */
 	struct wal_watcher *watcher;
+	/** Bit mask of events, see wal_event. */
 	unsigned events;
+	/** Signature of the oldest stored WAL row. */
+	int64_t gc_lsn;
 };
 
 enum wal_event {
@@ -74,13 +82,15 @@ enum wal_event {
 	WAL_EVENT_WRITE		= (1 << 0),
 	/** A new WAL is created. */
 	WAL_EVENT_ROTATE	= (1 << 1),
+	/** One or more old WALs have been deleted. */
+	WAL_EVENT_GC		= (1 << 2),
 };
 
 struct wal_watcher {
 	/** Link in wal_writer::watchers. */
 	struct rlist next;
 	/** The watcher callback function. */
-	void (*cb)(struct wal_watcher *, unsigned events);
+	void (*cb)(struct wal_watcher_msg *);
 	/** Pipe from the watcher to WAL. */
 	struct cpipe wal_pipe;
 	/** Pipe from WAL to the watcher. */
@@ -114,16 +124,16 @@ struct wal_watcher {
  * @param watcher     WAL watcher to register.
  * @param name        Name of the cbus endpoint at the caller's cord.
  * @param watcher_cb  Callback to invoke from the caller's cord
- *                    upon receiving a WAL event. Apart from the
- *                    watcher itself, it takes a bit mask of events.
- *                    Events are described in wal_event enum.
+ *                    upon receiving a WAL event. It takes an object
+ *                    of type wal_watcher_msg that stores a pointer
+ *                    to the watcher and information about the event.
  * @param process_cb  Function called to process cbus messages
  *                    while the watcher is being attached or NULL
  *                    if the cbus loop is running elsewhere.
  */
 void
 wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher *, unsigned events),
+		void (*watcher_cb)(struct wal_watcher_msg *),
 		void (*process_cb)(struct cbus_endpoint *));
 
 /**
diff --git a/src/box/xlog.h b/src/box/xlog.h
index 2233c022..75bc610b 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -198,6 +198,21 @@ void
 xdir_collect_inprogress(struct xdir *xdir);
 
 /**
+ * Return LSN and vclock (unless @vclock is NULL) of the oldest
+ * file in a directory or -1 if the directory is empty.
+ */
+static inline int64_t
+xdir_first_vclock(struct xdir *xdir, struct vclock *vclock)
+{
+	struct vclock *first = vclockset_first(&xdir->index);
+	if (first == NULL)
+		return -1;
+	if (vclock != NULL)
+		vclock_copy(vclock, first);
+	return vclock_sum(first);
+}
+
+/**
  * Return LSN and vclock (unless @vclock is NULL) of the newest
  * file in a directory or -1 if the directory is empty.
  */
-- 
2.11.0

  parent reply	other threads:[~2018-10-07 20:27 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-10-07 20:27 [PATCH 0/5] Delete old WAL files if running out of disk space Vladimir Davydov
2018-10-07 20:27 ` [PATCH 1/5] xlog: fix filename in error messages Vladimir Davydov
2018-10-12  8:19   ` Vladimir Davydov
2018-10-16 19:07   ` [tarantool-patches] " Konstantin Osipov
2018-10-07 20:27 ` [PATCH 2/5] wal: preallocate disk space before writing rows Vladimir Davydov
2018-10-16 19:09   ` [tarantool-patches] " Konstantin Osipov
2018-10-07 20:27 ` [PATCH 3/5] xlog: allow to limit number of files deleted by xdir_collect_garbage Vladimir Davydov
2018-10-07 20:27 ` Vladimir Davydov [this message]
2018-10-07 20:27 ` [PATCH 5/5] wal: delete old wal files when running out of disk space Vladimir Davydov
2018-10-16 19:05 ` [tarantool-patches] Re: [PATCH 0/5] Delete old WAL files if " Konstantin Osipov
2018-10-17  8:20   ` Vladimir Davydov
2018-10-23  8:37     ` Vladimir Davydov
2018-10-23  8:46       ` Konstantin Osipov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=ae88c694a30c04a01b8f8c762b9106a033687b57.1538942600.git.vdavydov.dev@gmail.com \
    --to=vdavydov.dev@gmail.com \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [PATCH 4/5] wal: notify watchers about wal file removal' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox