From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH v2 3/4] wal: notify watchers about wal file removal Date: Tue, 23 Oct 2018 20:26:33 +0300 Message-Id: <1f07297002d2dfb73afdee2e31408cbad8c5f305.1540314925.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: We will use this event to kill consumers when the WAL thread removes a WAL file on ENOSPC error. Needed for #3397 --- src/box/relay.cc | 8 ++++++-- src/box/wal.c | 31 ++++++++++++++++++------------- src/box/wal.h | 20 +++++++++++++++----- src/box/xlog.h | 15 +++++++++++++++ 4 files changed, 54 insertions(+), 20 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index 0a1e95af..6a93b469 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -406,9 +406,10 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock) } static void -relay_process_wal_event(struct wal_watcher *watcher, unsigned events) +relay_process_wal_event(struct wal_watcher_msg *msg) { - struct relay *relay = container_of(watcher, struct relay, wal_watcher); + struct relay *relay = container_of(msg->watcher, struct relay, + wal_watcher); if (relay->state != RELAY_FOLLOW) { /* * Do not try to send anything to the replica @@ -416,6 +417,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) */ return; } + unsigned events = msg->events; + if ((events & (WAL_EVENT_WRITE | WAL_EVENT_ROTATE)) == 0) + return; try { recover_remaining_wals(relay->r, &relay->stream, NULL, (events & WAL_EVENT_ROTATE) != 0); diff --git a/src/box/wal.c b/src/box/wal.c index baa3af65..ab4d3dd5 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -66,6 +66,9 @@ wal_write(struct journal *, struct journal_entry *); static int64_t wal_write_in_wal_mode_none(struct journal *, struct journal_entry *); +static void +wal_notify_watchers(struct wal_writer *writer, unsigned events); + /* WAL thread. */ struct wal_thread { /** 'wal' thread doing the writes. */ @@ -548,8 +551,10 @@ struct wal_gc_msg static int wal_collect_garbage_f(struct cbus_call_msg *data) { + struct wal_writer *writer = &wal_writer_singleton; int64_t lsn = ((struct wal_gc_msg *)data)->lsn; - xdir_collect_garbage(&wal_writer_singleton.wal_dir, lsn, 0); + xdir_collect_garbage(&writer->wal_dir, lsn, 0); + wal_notify_watchers(writer, WAL_EVENT_GC); return 0; } @@ -567,9 +572,6 @@ wal_collect_garbage(int64_t lsn) fiber_set_cancellable(cancellable); } -static void -wal_notify_watchers(struct wal_writer *writer, unsigned events); - /** * If there is no current WAL, try to open it, and close the * previous WAL. We close the previous WAL only after opening @@ -1031,7 +1033,10 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events) { assert(!rlist_empty(&watcher->next)); - if (watcher->msg.cmsg.route != NULL) { + struct wal_watcher_msg *msg = &watcher->msg; + struct wal_writer *writer = &wal_writer_singleton; + + if (msg->cmsg.route != NULL) { /* * If the notification message is still en route, * mark the watcher to resend it as soon as it @@ -1041,19 +1046,19 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events) return; } - watcher->msg.events = events; - cmsg_init(&watcher->msg.cmsg, watcher->route); - cpipe_push(&watcher->watcher_pipe, &watcher->msg.cmsg); + msg->events = events; + msg->gc_lsn = xdir_first_vclock(&writer->wal_dir, NULL); + if (msg->gc_lsn < 0) + msg->gc_lsn = vclock_sum(&writer->vclock); + cmsg_init(&msg->cmsg, watcher->route); + cpipe_push(&watcher->watcher_pipe, &msg->cmsg); } static void wal_watcher_notify_perform(struct cmsg *cmsg) { struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg; - struct wal_watcher *watcher = msg->watcher; - unsigned events = msg->events; - - watcher->cb(watcher, events); + msg->watcher->cb(msg); } static void @@ -1106,7 +1111,7 @@ wal_watcher_detach(void *arg) void wal_set_watcher(struct wal_watcher *watcher, const char *name, - void (*watcher_cb)(struct wal_watcher *, unsigned events), + void (*watcher_cb)(struct wal_watcher_msg *), void (*process_cb)(struct cbus_endpoint *)) { assert(journal_is_initialized(&wal_writer_singleton.base)); diff --git a/src/box/wal.h b/src/box/wal.h index 8ef1fb1d..4867ec3b 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -63,10 +63,18 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, void wal_thread_stop(); +/** + * A notification message sent from the WAL to a watcher + * when a WAL event occurs. + */ struct wal_watcher_msg { struct cmsg cmsg; + /** Pointer to the watcher this message is for. */ struct wal_watcher *watcher; + /** Bit mask of events, see wal_event. */ unsigned events; + /** Signature of the oldest stored WAL row. */ + int64_t gc_lsn; }; enum wal_event { @@ -74,13 +82,15 @@ enum wal_event { WAL_EVENT_WRITE = (1 << 0), /** A new WAL is created. */ WAL_EVENT_ROTATE = (1 << 1), + /** One or more old WALs have been deleted. */ + WAL_EVENT_GC = (1 << 2), }; struct wal_watcher { /** Link in wal_writer::watchers. */ struct rlist next; /** The watcher callback function. */ - void (*cb)(struct wal_watcher *, unsigned events); + void (*cb)(struct wal_watcher_msg *); /** Pipe from the watcher to WAL. */ struct cpipe wal_pipe; /** Pipe from WAL to the watcher. */ @@ -114,16 +124,16 @@ struct wal_watcher { * @param watcher WAL watcher to register. * @param name Name of the cbus endpoint at the caller's cord. * @param watcher_cb Callback to invoke from the caller's cord - * upon receiving a WAL event. Apart from the - * watcher itself, it takes a bit mask of events. - * Events are described in wal_event enum. + * upon receiving a WAL event. It takes an object + * of type wal_watcher_msg that stores a pointer + * to the watcher and information about the event. * @param process_cb Function called to process cbus messages * while the watcher is being attached or NULL * if the cbus loop is running elsewhere. */ void wal_set_watcher(struct wal_watcher *watcher, const char *name, - void (*watcher_cb)(struct wal_watcher *, unsigned events), + void (*watcher_cb)(struct wal_watcher_msg *), void (*process_cb)(struct cbus_endpoint *)); /** diff --git a/src/box/xlog.h b/src/box/xlog.h index d08ef16f..cb5207c1 100644 --- a/src/box/xlog.h +++ b/src/box/xlog.h @@ -203,6 +203,21 @@ void xdir_collect_inprogress(struct xdir *xdir); /** + * Return LSN and vclock (unless @vclock is NULL) of the oldest + * file in a directory or -1 if the directory is empty. + */ +static inline int64_t +xdir_first_vclock(struct xdir *xdir, struct vclock *vclock) +{ + struct vclock *first = vclockset_first(&xdir->index); + if (first == NULL) + return -1; + if (vclock != NULL) + vclock_copy(vclock, first); + return vclock_sum(first); +} + +/** * Return LSN and vclock (unless @vclock is NULL) of the newest * file in a directory or -1 if the directory is empty. */ -- 2.11.0