[PATCH v2 3/4] wal: notify watchers about wal file removal
Vladimir Davydov
vdavydov.dev at gmail.com
Tue Oct 23 20:26:33 MSK 2018
We will use this event to kill consumers when the WAL thread removes
a WAL file on ENOSPC error.
Needed for #3397
---
src/box/relay.cc | 8 ++++++--
src/box/wal.c | 31 ++++++++++++++++++-------------
src/box/wal.h | 20 +++++++++++++++-----
src/box/xlog.h | 15 +++++++++++++++
4 files changed, 54 insertions(+), 20 deletions(-)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 0a1e95af..6a93b469 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -406,9 +406,10 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
}
static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
+relay_process_wal_event(struct wal_watcher_msg *msg)
{
- struct relay *relay = container_of(watcher, struct relay, wal_watcher);
+ struct relay *relay = container_of(msg->watcher, struct relay,
+ wal_watcher);
if (relay->state != RELAY_FOLLOW) {
/*
* Do not try to send anything to the replica
@@ -416,6 +417,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
*/
return;
}
+ unsigned events = msg->events;
+ if ((events & (WAL_EVENT_WRITE | WAL_EVENT_ROTATE)) == 0)
+ return;
try {
recover_remaining_wals(relay->r, &relay->stream, NULL,
(events & WAL_EVENT_ROTATE) != 0);
diff --git a/src/box/wal.c b/src/box/wal.c
index baa3af65..ab4d3dd5 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -66,6 +66,9 @@ wal_write(struct journal *, struct journal_entry *);
static int64_t
wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
+static void
+wal_notify_watchers(struct wal_writer *writer, unsigned events);
+
/* WAL thread. */
struct wal_thread {
/** 'wal' thread doing the writes. */
@@ -548,8 +551,10 @@ struct wal_gc_msg
static int
wal_collect_garbage_f(struct cbus_call_msg *data)
{
+ struct wal_writer *writer = &wal_writer_singleton;
int64_t lsn = ((struct wal_gc_msg *)data)->lsn;
- xdir_collect_garbage(&wal_writer_singleton.wal_dir, lsn, 0);
+ xdir_collect_garbage(&writer->wal_dir, lsn, 0);
+ wal_notify_watchers(writer, WAL_EVENT_GC);
return 0;
}
@@ -567,9 +572,6 @@ wal_collect_garbage(int64_t lsn)
fiber_set_cancellable(cancellable);
}
-static void
-wal_notify_watchers(struct wal_writer *writer, unsigned events);
-
/**
* If there is no current WAL, try to open it, and close the
* previous WAL. We close the previous WAL only after opening
@@ -1031,7 +1033,10 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
{
assert(!rlist_empty(&watcher->next));
- if (watcher->msg.cmsg.route != NULL) {
+ struct wal_watcher_msg *msg = &watcher->msg;
+ struct wal_writer *writer = &wal_writer_singleton;
+
+ if (msg->cmsg.route != NULL) {
/*
* If the notification message is still en route,
* mark the watcher to resend it as soon as it
@@ -1041,19 +1046,19 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
return;
}
- watcher->msg.events = events;
- cmsg_init(&watcher->msg.cmsg, watcher->route);
- cpipe_push(&watcher->watcher_pipe, &watcher->msg.cmsg);
+ msg->events = events;
+ msg->gc_lsn = xdir_first_vclock(&writer->wal_dir, NULL);
+ if (msg->gc_lsn < 0)
+ msg->gc_lsn = vclock_sum(&writer->vclock);
+ cmsg_init(&msg->cmsg, watcher->route);
+ cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
}
static void
wal_watcher_notify_perform(struct cmsg *cmsg)
{
struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg;
- struct wal_watcher *watcher = msg->watcher;
- unsigned events = msg->events;
-
- watcher->cb(watcher, events);
+ msg->watcher->cb(msg);
}
static void
@@ -1106,7 +1111,7 @@ wal_watcher_detach(void *arg)
void
wal_set_watcher(struct wal_watcher *watcher, const char *name,
- void (*watcher_cb)(struct wal_watcher *, unsigned events),
+ void (*watcher_cb)(struct wal_watcher_msg *),
void (*process_cb)(struct cbus_endpoint *))
{
assert(journal_is_initialized(&wal_writer_singleton.base));
diff --git a/src/box/wal.h b/src/box/wal.h
index 8ef1fb1d..4867ec3b 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -63,10 +63,18 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname,
void
wal_thread_stop();
+/**
+ * A notification message sent from the WAL to a watcher
+ * when a WAL event occurs.
+ */
struct wal_watcher_msg {
struct cmsg cmsg;
+ /** Pointer to the watcher this message is for. */
struct wal_watcher *watcher;
+ /** Bit mask of events, see wal_event. */
unsigned events;
+ /** Signature of the oldest stored WAL row. */
+ int64_t gc_lsn;
};
enum wal_event {
@@ -74,13 +82,15 @@ enum wal_event {
WAL_EVENT_WRITE = (1 << 0),
/** A new WAL is created. */
WAL_EVENT_ROTATE = (1 << 1),
+ /** One or more old WALs have been deleted. */
+ WAL_EVENT_GC = (1 << 2),
};
struct wal_watcher {
/** Link in wal_writer::watchers. */
struct rlist next;
/** The watcher callback function. */
- void (*cb)(struct wal_watcher *, unsigned events);
+ void (*cb)(struct wal_watcher_msg *);
/** Pipe from the watcher to WAL. */
struct cpipe wal_pipe;
/** Pipe from WAL to the watcher. */
@@ -114,16 +124,16 @@ struct wal_watcher {
* @param watcher WAL watcher to register.
* @param name Name of the cbus endpoint at the caller's cord.
* @param watcher_cb Callback to invoke from the caller's cord
- * upon receiving a WAL event. Apart from the
- * watcher itself, it takes a bit mask of events.
- * Events are described in wal_event enum.
+ * upon receiving a WAL event. It takes an object
+ * of type wal_watcher_msg that stores a pointer
+ * to the watcher and information about the event.
* @param process_cb Function called to process cbus messages
* while the watcher is being attached or NULL
* if the cbus loop is running elsewhere.
*/
void
wal_set_watcher(struct wal_watcher *watcher, const char *name,
- void (*watcher_cb)(struct wal_watcher *, unsigned events),
+ void (*watcher_cb)(struct wal_watcher_msg *),
void (*process_cb)(struct cbus_endpoint *));
/**
diff --git a/src/box/xlog.h b/src/box/xlog.h
index d08ef16f..cb5207c1 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -203,6 +203,21 @@ void
xdir_collect_inprogress(struct xdir *xdir);
/**
+ * Return LSN and vclock (unless @vclock is NULL) of the oldest
+ * file in a directory or -1 if the directory is empty.
+ */
+static inline int64_t
+xdir_first_vclock(struct xdir *xdir, struct vclock *vclock)
+{
+ struct vclock *first = vclockset_first(&xdir->index);
+ if (first == NULL)
+ return -1;
+ if (vclock != NULL)
+ vclock_copy(vclock, first);
+ return vclock_sum(first);
+}
+
+/**
* Return LSN and vclock (unless @vclock is NULL) of the newest
* file in a directory or -1 if the directory is empty.
*/
--
2.11.0
More information about the Tarantool-patches
mailing list