[Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher

Serge Petrenko sergepetrenko at tarantool.org
Thu Dec 10 23:55:13 MSK 2020


Add a list of lsn watchers to relay.

Relay_lsn_watcher is a structure that lives in tx thread and monitors
replica's progress in applying rows coming from a given replica id.

The watcher fires once relay reports that replica has acked the target lsn
for the given replica id.

The watcher is owned by some tx fiber, which typically sleeps until the
watcher wakes it up. Besides waking the waiting fiber up, the watcher may
perform some work as dictated by the notify function.

Prerequisite #5435
---
 src/box/relay.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++--
 src/box/relay.h  | 44 +++++++++++++++++++++++++++++
 2 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index f342a79dd..4014792a6 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -152,6 +152,8 @@ struct relay {
 		 * anonymous replica, for example.
 		 */
 		bool is_raft_enabled;
+		/** A list of lsn watchers registered for this relay. */
+		struct rlist lsn_watchers;
 	} tx;
 };
 
@@ -200,6 +202,7 @@ relay_new(struct replica *replica)
 	fiber_cond_create(&relay->reader_cond);
 	diag_create(&relay->diag);
 	stailq_create(&relay->pending_gc);
+	rlist_create(&relay->tx.lsn_watchers);
 	relay->state = RELAY_OFF;
 	return relay;
 }
@@ -394,6 +397,54 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 	});
 }
 
+void
+relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher)
+{
+	rlist_add_tail_entry(&relay->tx.lsn_watchers, watcher, in_list);
+}
+
+void
+relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
+			 int64_t target_lsn, relay_lsn_watcher_f notify,
+			 relay_lsn_watcher_f destroy, void *data)
+{
+	watcher->replica_id = replica_id;
+	watcher->target_lsn = target_lsn;
+	watcher->notify = notify;
+	watcher->destroy = destroy;
+	watcher->data = data;
+	watcher->waiter = fiber();
+}
+
+/**
+ * Destroy the watcher.
+ * Wake the waiting fiber up, fire the on destroy callback and remove the
+ * watcher from the relay's watcher list.
+ */
+static void
+relay_lsn_watcher_destroy(struct relay_lsn_watcher *watcher)
+{
+	watcher->destroy(watcher->data);
+	fiber_wakeup(watcher->waiter);
+	rlist_del_entry(watcher, in_list);
+}
+
+/**
+ * Notify the watcher that the replica has advanced to the given vclock.
+ * In case target_lsn is hit for watcher's replica_id, fire the notify
+ * callback and destroy the watcher.
+ */
+static void
+relay_lsn_watcher_notify(struct relay_lsn_watcher *watcher,
+			 struct vclock *vclock)
+{
+	int64_t lsn = vclock_get(vclock, watcher->replica_id);
+	if (lsn >= watcher->target_lsn) {
+		watcher->notify(watcher->data);
+		relay_lsn_watcher_destroy(watcher);
+	}
+}
+
 /**
  * The message which updated tx thread with a new vclock has returned back
  * to the relay.
@@ -411,7 +462,8 @@ static void
 tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
-	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	struct relay *relay = status->relay;
+	vclock_copy(&relay->tx.vclock, &status->vclock);
 	/*
 	 * Let pending synchronous transactions know, which of
 	 * them were successfully sent to the replica. Acks are
@@ -420,14 +472,18 @@ tx_status_update(struct cmsg *msg)
 	 * for master's CONFIRM message instead.
 	 */
 	if (txn_limbo.owner_id == instance_id) {
-		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
+		txn_limbo_ack(&txn_limbo, relay->replica->id,
 			      vclock_get(&status->vclock, instance_id));
 	}
+	struct relay_lsn_watcher *watcher, *tmp;
+	rlist_foreach_entry_safe(watcher, &relay->tx.lsn_watchers, in_list, tmp) {
+		relay_lsn_watcher_notify(watcher, &status->vclock);
+	}
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
 	cmsg_init(msg, route);
-	cpipe_push(&status->relay->relay_pipe, msg);
+	cpipe_push(&relay->relay_pipe, msg);
 }
 
 /**
@@ -604,6 +660,17 @@ tx_notify_is_relay_running(struct cmsg *base)
 	/* Never subscribe anonymous replicas to raft updates. */
 	if (!msg->relay->replica->anon)
 		msg->relay->tx.is_raft_enabled = msg->is_running;
+	/*
+	 * Notify everyone waiting for a specific row to be replicated through
+	 * this relay there's nothing to wait for anymore.
+	 */
+	if (!msg->is_running) {
+		struct relay_lsn_watcher *watcher, *tmp;
+		rlist_foreach_entry_safe(watcher, &msg->relay->tx.lsn_watchers,
+					 in_list, tmp) {
+			relay_lsn_watcher_destroy(watcher);
+		}
+	}
 }
 
 /** Relay thread part of the relay is running notification, second hop. */
diff --git a/src/box/relay.h b/src/box/relay.h
index b32e2ea2a..da2e3498a 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -32,6 +32,7 @@
  */
 
 #include <stdint.h>
+#include "small/rlist.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -134,4 +135,47 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_vclock, uint32_t replica_version_id,
 		uint32_t replica_id_filter);
 
+/**
+ * A callback invoked once lsn watcher sees the replica has reached the target
+ * lsn for the given replica id.
+ */
+typedef void (*relay_lsn_watcher_f)(void *data);
+
+/**
+ * A relay watcher which notifies tx via calling a given function once the
+ * replica confirms it has received the rows from replica_id up to target_lsn.
+ */
+struct relay_lsn_watcher {
+	/** A replica_id to monitor rows from. */
+	uint32_t replica_id;
+	/** The lsn to wait for. */
+	int64_t target_lsn;
+	/**
+	 * A callback invoked in the tx thread once the watcher sees that
+	 * replica has reached the target lsn.
+	 */
+	relay_lsn_watcher_f notify;
+	/**
+	 * A callback fired once the relay thread exits to notify the waiter
+	 * there's nothing to wait for anymore.
+	 */
+	relay_lsn_watcher_f destroy;
+	/** Data passed to \a notify and \a destroy. */
+	void *data;
+	/** A fiber waiting for this watcher to fire. */
+	struct fiber *waiter;
+	/** A link in relay's watcher list. */
+	struct rlist in_list;
+};
+
+/** Initialize a pre-allocated lsn watcher. */
+void
+relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
+			 int64_t target_lsn, relay_lsn_watcher_f notify,
+			 relay_lsn_watcher_f destroy, void *data);
+
+/** Attach the given lsn watcher to the relay. */
+void
+relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher);
+
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list