[tarantool-patches] [PATCH 6/7] Refactoring: remove wal_watcher routines

Georgy Kirichenko georgy at tarantool.org
Tue Aug 13 09:27:44 MSK 2019


As relay uses wal memory replication there is no need in this facility
yet.
---
 src/box/relay.cc |   2 -
 src/box/wal.c    | 127 -----------------------------------------------
 src/box/wal.h    |  68 -------------------------
 3 files changed, 197 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 05fc0f691..1c2332878 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -82,8 +82,6 @@ struct relay {
 	struct vclock stop_vclock;
 	/** Remote replica */
 	struct replica *replica;
-	/** WAL event watcher. */
-	struct wal_watcher wal_watcher;
 	/** Relay diagnostics. */
 	struct diag diag;
 	/** Vclock recieved from replica. */
diff --git a/src/box/wal.c b/src/box/wal.c
index 0457f3d46..2b9a3c805 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -724,9 +724,6 @@ wal_set_checkpoint_threshold(int64_t threshold)
 	fiber_set_cancellable(cancellable);
 }
 
-static void
-wal_notify_watchers(struct wal_writer *writer, unsigned events);
-
 /**
  * If there is no current WAL, try to open it, and close the
  * previous WAL. We close the previous WAL only after opening
@@ -771,7 +768,6 @@ wal_opt_rotate(struct wal_writer *writer)
 	vclock_copy(&writer->prev_vclock, &writer->vclock);
 
 	wal_notify_log_action(writer, WAL_LOG_OPEN);
-	wal_notify_watchers(writer, WAL_EVENT_ROTATE);
 	return 0;
 }
 
@@ -1123,7 +1119,6 @@ done:
 		wal_writer_begin_rollback(writer);
 	}
 	fiber_gc();
-	wal_notify_watchers(writer, WAL_EVENT_WRITE);
 }
 
 /*
@@ -1364,128 +1359,6 @@ wal_rotate_vy_log()
 	fiber_set_cancellable(cancellable);
 }
 
-static void
-wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
-{
-	assert(!rlist_empty(&watcher->next));
-
-	struct wal_watcher_msg *msg = &watcher->msg;
-	if (msg->cmsg.route != NULL) {
-		/*
-		 * If the notification message is still en route,
-		 * mark the watcher to resend it as soon as it
-		 * returns to WAL so as not to lose any events.
-		 */
-		watcher->pending_events |= events;
-		return;
-	}
-
-	msg->events = events;
-	cmsg_init(&msg->cmsg, watcher->route);
-	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
-}
-
-static void
-wal_watcher_notify_perform(struct cmsg *cmsg)
-{
-	struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg;
-	struct wal_watcher *watcher = msg->watcher;
-	unsigned events = msg->events;
-
-	watcher->cb(watcher, events);
-}
-
-static void
-wal_watcher_notify_complete(struct cmsg *cmsg)
-{
-	struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg;
-	struct wal_watcher *watcher = msg->watcher;
-
-	cmsg->route = NULL;
-
-	if (rlist_empty(&watcher->next)) {
-		/* The watcher is about to be destroyed. */
-		return;
-	}
-
-	if (watcher->pending_events != 0) {
-		/*
-		 * Resend the message if we got notified while
-		 * it was en route, see wal_watcher_notify().
-		 */
-		wal_watcher_notify(watcher, watcher->pending_events);
-		watcher->pending_events = 0;
-	}
-}
-
-static void
-wal_watcher_attach(void *arg)
-{
-	struct wal_watcher *watcher = (struct wal_watcher *) arg;
-	struct wal_writer *writer = &wal_writer_singleton;
-
-	assert(rlist_empty(&watcher->next));
-	rlist_add_tail_entry(&writer->watchers, watcher, next);
-
-	/*
-	 * Notify the watcher right after registering it
-	 * so that it can process existing WALs.
-	 */
-	wal_watcher_notify(watcher, WAL_EVENT_ROTATE);
-}
-
-static void
-wal_watcher_detach(void *arg)
-{
-	struct wal_watcher *watcher = (struct wal_watcher *) arg;
-
-	assert(!rlist_empty(&watcher->next));
-	rlist_del_entry(watcher, next);
-}
-
-void
-wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher *, unsigned events),
-		void (*process_cb)(struct cbus_endpoint *))
-{
-	assert(journal_is_initialized(&wal_writer_singleton.base));
-
-	rlist_create(&watcher->next);
-	watcher->cb = watcher_cb;
-	watcher->msg.watcher = watcher;
-	watcher->msg.events = 0;
-	watcher->msg.cmsg.route = NULL;
-	watcher->pending_events = 0;
-
-	assert(lengthof(watcher->route) == 2);
-	watcher->route[0] = (struct cmsg_hop)
-		{ wal_watcher_notify_perform, &watcher->wal_pipe };
-	watcher->route[1] = (struct cmsg_hop)
-		{ wal_watcher_notify_complete, NULL };
-
-	  cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe,
-		  wal_watcher_attach, watcher, process_cb);
-}
-
-void
-wal_clear_watcher(struct wal_watcher *watcher,
-		  void (*process_cb)(struct cbus_endpoint *))
-{
-	assert(journal_is_initialized(&wal_writer_singleton.base));
-
-	cbus_unpair(&watcher->wal_pipe, &watcher->watcher_pipe,
-		    wal_watcher_detach, watcher, process_cb);
-}
-
-static void
-wal_notify_watchers(struct wal_writer *writer, unsigned events)
-{
-	struct wal_watcher *watcher;
-	rlist_foreach_entry(watcher, &writer->watchers, next)
-		wal_watcher_notify(watcher, events);
-}
-
-
 /**
  * After fork, the WAL writer thread disappears.
  * Make sure that atexit() handlers in the child do
diff --git a/src/box/wal.h b/src/box/wal.h
index bd298cebe..ab24a93cb 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -91,12 +91,6 @@ wal_enable(void);
 void
 wal_free(void);
 
-struct wal_watcher_msg {
-	struct cmsg cmsg;
-	struct wal_watcher *watcher;
-	unsigned events;
-};
-
 enum wal_event {
 	/** A row is written to the current WAL. */
 	WAL_EVENT_WRITE		= (1 << 0),
@@ -104,68 +98,6 @@ enum wal_event {
 	WAL_EVENT_ROTATE	= (1 << 1),
 };
 
-struct wal_watcher {
-	/** Link in wal_writer::watchers. */
-	struct rlist next;
-	/** The watcher callback function. */
-	void (*cb)(struct wal_watcher *, unsigned events);
-	/** Pipe from the watcher to WAL. */
-	struct cpipe wal_pipe;
-	/** Pipe from WAL to the watcher. */
-	struct cpipe watcher_pipe;
-	/** Cbus route used for notifying the watcher. */
-	struct cmsg_hop route[2];
-	/** Message sent to notify the watcher. */
-	struct wal_watcher_msg msg;
-	/**
-	 * Bit mask of WAL events that happened while
-	 * the notification message was en route.
-	 * It indicates that the message must be resend
-	 * right upon returning to WAL.
-	 */
-	unsigned pending_events;
-};
-
-/**
- * Subscribe to WAL events.
- *
- * The caller will receive a notification after a WAL write with
- * unspecified but reasonable latency. The first notification is
- * sent right after registering the watcher so that the caller
- * can process WALs written before the function was called.
- *
- * Note WAL notifications are delivered via cbus hence the caller
- * must have set up the cbus endpoint and started the event loop.
- * Alternatively, one can pass a callback invoking cbus_process()
- * to this function.
- *
- * @param watcher     WAL watcher to register.
- * @param name        Name of the cbus endpoint at the caller's cord.
- * @param watcher_cb  Callback to invoke from the caller's cord
- *                    upon receiving a WAL event. Apart from the
- *                    watcher itself, it takes a bit mask of events.
- *                    Events are described in wal_event enum.
- * @param process_cb  Function called to process cbus messages
- *                    while the watcher is being attached or NULL
- *                    if the cbus loop is running elsewhere.
- */
-void
-wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher *, unsigned events),
-		void (*process_cb)(struct cbus_endpoint *));
-
-/**
- * Unsubscribe from WAL events.
- *
- * @param watcher     WAL watcher to unregister.
- * @param process_cb  Function invoked to process cbus messages
- *                    while the watcher is being detached or NULL
- *                    if the cbus loop is running elsewhere.
- */
-void
-wal_clear_watcher(struct wal_watcher *watcher,
-		  void (*process_cb)(struct cbus_endpoint *));
-
 void
 wal_atfork();
 
-- 
2.22.0





More information about the Tarantool-patches mailing list