From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id C39C82885D for ; Tue, 13 Aug 2019 02:27:54 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id hgEJJkOYfK-N for ; Tue, 13 Aug 2019 02:27:54 -0400 (EDT) Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 5A62528857 for ; Tue, 13 Aug 2019 02:27:54 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH 6/7] Refactoring: remove wal_watcher routines Date: Tue, 13 Aug 2019 09:27:44 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko As relay uses wal memory replication there is no need in this facility yet. --- src/box/relay.cc | 2 - src/box/wal.c | 127 ----------------------------------------------- src/box/wal.h | 68 ------------------------- 3 files changed, 197 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index 05fc0f691..1c2332878 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -82,8 +82,6 @@ struct relay { struct vclock stop_vclock; /** Remote replica */ struct replica *replica; - /** WAL event watcher. */ - struct wal_watcher wal_watcher; /** Relay diagnostics. */ struct diag diag; /** Vclock recieved from replica. */ diff --git a/src/box/wal.c b/src/box/wal.c index 0457f3d46..2b9a3c805 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -724,9 +724,6 @@ wal_set_checkpoint_threshold(int64_t threshold) fiber_set_cancellable(cancellable); } -static void -wal_notify_watchers(struct wal_writer *writer, unsigned events); - /** * If there is no current WAL, try to open it, and close the * previous WAL. We close the previous WAL only after opening @@ -771,7 +768,6 @@ wal_opt_rotate(struct wal_writer *writer) vclock_copy(&writer->prev_vclock, &writer->vclock); wal_notify_log_action(writer, WAL_LOG_OPEN); - wal_notify_watchers(writer, WAL_EVENT_ROTATE); return 0; } @@ -1123,7 +1119,6 @@ done: wal_writer_begin_rollback(writer); } fiber_gc(); - wal_notify_watchers(writer, WAL_EVENT_WRITE); } /* @@ -1364,128 +1359,6 @@ wal_rotate_vy_log() fiber_set_cancellable(cancellable); } -static void -wal_watcher_notify(struct wal_watcher *watcher, unsigned events) -{ - assert(!rlist_empty(&watcher->next)); - - struct wal_watcher_msg *msg = &watcher->msg; - if (msg->cmsg.route != NULL) { - /* - * If the notification message is still en route, - * mark the watcher to resend it as soon as it - * returns to WAL so as not to lose any events. - */ - watcher->pending_events |= events; - return; - } - - msg->events = events; - cmsg_init(&msg->cmsg, watcher->route); - cpipe_push(&watcher->watcher_pipe, &msg->cmsg); -} - -static void -wal_watcher_notify_perform(struct cmsg *cmsg) -{ - struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg; - struct wal_watcher *watcher = msg->watcher; - unsigned events = msg->events; - - watcher->cb(watcher, events); -} - -static void -wal_watcher_notify_complete(struct cmsg *cmsg) -{ - struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg; - struct wal_watcher *watcher = msg->watcher; - - cmsg->route = NULL; - - if (rlist_empty(&watcher->next)) { - /* The watcher is about to be destroyed. */ - return; - } - - if (watcher->pending_events != 0) { - /* - * Resend the message if we got notified while - * it was en route, see wal_watcher_notify(). - */ - wal_watcher_notify(watcher, watcher->pending_events); - watcher->pending_events = 0; - } -} - -static void -wal_watcher_attach(void *arg) -{ - struct wal_watcher *watcher = (struct wal_watcher *) arg; - struct wal_writer *writer = &wal_writer_singleton; - - assert(rlist_empty(&watcher->next)); - rlist_add_tail_entry(&writer->watchers, watcher, next); - - /* - * Notify the watcher right after registering it - * so that it can process existing WALs. - */ - wal_watcher_notify(watcher, WAL_EVENT_ROTATE); -} - -static void -wal_watcher_detach(void *arg) -{ - struct wal_watcher *watcher = (struct wal_watcher *) arg; - - assert(!rlist_empty(&watcher->next)); - rlist_del_entry(watcher, next); -} - -void -wal_set_watcher(struct wal_watcher *watcher, const char *name, - void (*watcher_cb)(struct wal_watcher *, unsigned events), - void (*process_cb)(struct cbus_endpoint *)) -{ - assert(journal_is_initialized(&wal_writer_singleton.base)); - - rlist_create(&watcher->next); - watcher->cb = watcher_cb; - watcher->msg.watcher = watcher; - watcher->msg.events = 0; - watcher->msg.cmsg.route = NULL; - watcher->pending_events = 0; - - assert(lengthof(watcher->route) == 2); - watcher->route[0] = (struct cmsg_hop) - { wal_watcher_notify_perform, &watcher->wal_pipe }; - watcher->route[1] = (struct cmsg_hop) - { wal_watcher_notify_complete, NULL }; - - cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe, - wal_watcher_attach, watcher, process_cb); -} - -void -wal_clear_watcher(struct wal_watcher *watcher, - void (*process_cb)(struct cbus_endpoint *)) -{ - assert(journal_is_initialized(&wal_writer_singleton.base)); - - cbus_unpair(&watcher->wal_pipe, &watcher->watcher_pipe, - wal_watcher_detach, watcher, process_cb); -} - -static void -wal_notify_watchers(struct wal_writer *writer, unsigned events) -{ - struct wal_watcher *watcher; - rlist_foreach_entry(watcher, &writer->watchers, next) - wal_watcher_notify(watcher, events); -} - - /** * After fork, the WAL writer thread disappears. * Make sure that atexit() handlers in the child do diff --git a/src/box/wal.h b/src/box/wal.h index bd298cebe..ab24a93cb 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -91,12 +91,6 @@ wal_enable(void); void wal_free(void); -struct wal_watcher_msg { - struct cmsg cmsg; - struct wal_watcher *watcher; - unsigned events; -}; - enum wal_event { /** A row is written to the current WAL. */ WAL_EVENT_WRITE = (1 << 0), @@ -104,68 +98,6 @@ enum wal_event { WAL_EVENT_ROTATE = (1 << 1), }; -struct wal_watcher { - /** Link in wal_writer::watchers. */ - struct rlist next; - /** The watcher callback function. */ - void (*cb)(struct wal_watcher *, unsigned events); - /** Pipe from the watcher to WAL. */ - struct cpipe wal_pipe; - /** Pipe from WAL to the watcher. */ - struct cpipe watcher_pipe; - /** Cbus route used for notifying the watcher. */ - struct cmsg_hop route[2]; - /** Message sent to notify the watcher. */ - struct wal_watcher_msg msg; - /** - * Bit mask of WAL events that happened while - * the notification message was en route. - * It indicates that the message must be resend - * right upon returning to WAL. - */ - unsigned pending_events; -}; - -/** - * Subscribe to WAL events. - * - * The caller will receive a notification after a WAL write with - * unspecified but reasonable latency. The first notification is - * sent right after registering the watcher so that the caller - * can process WALs written before the function was called. - * - * Note WAL notifications are delivered via cbus hence the caller - * must have set up the cbus endpoint and started the event loop. - * Alternatively, one can pass a callback invoking cbus_process() - * to this function. - * - * @param watcher WAL watcher to register. - * @param name Name of the cbus endpoint at the caller's cord. - * @param watcher_cb Callback to invoke from the caller's cord - * upon receiving a WAL event. Apart from the - * watcher itself, it takes a bit mask of events. - * Events are described in wal_event enum. - * @param process_cb Function called to process cbus messages - * while the watcher is being attached or NULL - * if the cbus loop is running elsewhere. - */ -void -wal_set_watcher(struct wal_watcher *watcher, const char *name, - void (*watcher_cb)(struct wal_watcher *, unsigned events), - void (*process_cb)(struct cbus_endpoint *)); - -/** - * Unsubscribe from WAL events. - * - * @param watcher WAL watcher to unregister. - * @param process_cb Function invoked to process cbus messages - * while the watcher is being detached or NULL - * if the cbus loop is running elsewhere. - */ -void -wal_clear_watcher(struct wal_watcher *watcher, - void (*process_cb)(struct cbus_endpoint *)); - void wal_atfork(); -- 2.22.0