Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko <sergepetrenko@tarantool.org>
To: v.shpilevoy@tarantool.org, cyrillos@gmail.com
Cc: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher
Date: Thu, 10 Dec 2020 23:55:13 +0300	[thread overview]
Message-ID: <449ae5e1cd4b4e22f55ff53a4404717c0229638a.1607633488.git.sergepetrenko@tarantool.org> (raw)
In-Reply-To: <cover.1607633488.git.sergepetrenko@tarantool.org>

Add a list of lsn watchers to relay.

Relay_lsn_watcher is a structure that lives in tx thread and monitors
replica's progress in applying rows coming from a given replica id.

The watcher fires once relay reports that replica has acked the target lsn
for the given replica id.

The watcher is owned by some tx fiber, which typically sleeps until the
watcher wakes it up. Besides waking the waiting fiber up, the watcher may
perform some work as dictated by the notify function.

Prerequisite #5435
---
 src/box/relay.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++--
 src/box/relay.h  | 44 +++++++++++++++++++++++++++++
 2 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index f342a79dd..4014792a6 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -152,6 +152,8 @@ struct relay {
 		 * anonymous replica, for example.
 		 */
 		bool is_raft_enabled;
+		/** A list of lsn watchers registered for this relay. */
+		struct rlist lsn_watchers;
 	} tx;
 };
 
@@ -200,6 +202,7 @@ relay_new(struct replica *replica)
 	fiber_cond_create(&relay->reader_cond);
 	diag_create(&relay->diag);
 	stailq_create(&relay->pending_gc);
+	rlist_create(&relay->tx.lsn_watchers);
 	relay->state = RELAY_OFF;
 	return relay;
 }
@@ -394,6 +397,54 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 	});
 }
 
+void
+relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher)
+{
+	rlist_add_tail_entry(&relay->tx.lsn_watchers, watcher, in_list);
+}
+
+void
+relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
+			 int64_t target_lsn, relay_lsn_watcher_f notify,
+			 relay_lsn_watcher_f destroy, void *data)
+{
+	watcher->replica_id = replica_id;
+	watcher->target_lsn = target_lsn;
+	watcher->notify = notify;
+	watcher->destroy = destroy;
+	watcher->data = data;
+	watcher->waiter = fiber();
+}
+
+/**
+ * Destroy the watcher.
+ * Wake the waiting fiber up, fire the on destroy callback and remove the
+ * watcher from the relay's watcher list.
+ */
+static void
+relay_lsn_watcher_destroy(struct relay_lsn_watcher *watcher)
+{
+	watcher->destroy(watcher->data);
+	fiber_wakeup(watcher->waiter);
+	rlist_del_entry(watcher, in_list);
+}
+
+/**
+ * Notify the watcher that the replica has advanced to the given vclock.
+ * In case target_lsn is hit for watcher's replica_id, fire the notify
+ * callback and destroy the watcher.
+ */
+static void
+relay_lsn_watcher_notify(struct relay_lsn_watcher *watcher,
+			 struct vclock *vclock)
+{
+	int64_t lsn = vclock_get(vclock, watcher->replica_id);
+	if (lsn >= watcher->target_lsn) {
+		watcher->notify(watcher->data);
+		relay_lsn_watcher_destroy(watcher);
+	}
+}
+
 /**
  * The message which updated tx thread with a new vclock has returned back
  * to the relay.
@@ -411,7 +462,8 @@ static void
 tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
-	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	struct relay *relay = status->relay;
+	vclock_copy(&relay->tx.vclock, &status->vclock);
 	/*
 	 * Let pending synchronous transactions know, which of
 	 * them were successfully sent to the replica. Acks are
@@ -420,14 +472,18 @@ tx_status_update(struct cmsg *msg)
 	 * for master's CONFIRM message instead.
 	 */
 	if (txn_limbo.owner_id == instance_id) {
-		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
+		txn_limbo_ack(&txn_limbo, relay->replica->id,
 			      vclock_get(&status->vclock, instance_id));
 	}
+	struct relay_lsn_watcher *watcher, *tmp;
+	rlist_foreach_entry_safe(watcher, &relay->tx.lsn_watchers, in_list, tmp) {
+		relay_lsn_watcher_notify(watcher, &status->vclock);
+	}
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
 	cmsg_init(msg, route);
-	cpipe_push(&status->relay->relay_pipe, msg);
+	cpipe_push(&relay->relay_pipe, msg);
 }
 
 /**
@@ -604,6 +660,17 @@ tx_notify_is_relay_running(struct cmsg *base)
 	/* Never subscribe anonymous replicas to raft updates. */
 	if (!msg->relay->replica->anon)
 		msg->relay->tx.is_raft_enabled = msg->is_running;
+	/*
+	 * Notify everyone waiting for a specific row to be replicated through
+	 * this relay there's nothing to wait for anymore.
+	 */
+	if (!msg->is_running) {
+		struct relay_lsn_watcher *watcher, *tmp;
+		rlist_foreach_entry_safe(watcher, &msg->relay->tx.lsn_watchers,
+					 in_list, tmp) {
+			relay_lsn_watcher_destroy(watcher);
+		}
+	}
 }
 
 /** Relay thread part of the relay is running notification, second hop. */
diff --git a/src/box/relay.h b/src/box/relay.h
index b32e2ea2a..da2e3498a 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -32,6 +32,7 @@
  */
 
 #include <stdint.h>
+#include "small/rlist.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -134,4 +135,47 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_vclock, uint32_t replica_version_id,
 		uint32_t replica_id_filter);
 
+/**
+ * A callback invoked once lsn watcher sees the replica has reached the target
+ * lsn for the given replica id.
+ */
+typedef void (*relay_lsn_watcher_f)(void *data);
+
+/**
+ * A relay watcher which notifies tx via calling a given function once the
+ * replica confirms it has received the rows from replica_id up to target_lsn.
+ */
+struct relay_lsn_watcher {
+	/** A replica_id to monitor rows from. */
+	uint32_t replica_id;
+	/** The lsn to wait for. */
+	int64_t target_lsn;
+	/**
+	 * A callback invoked in the tx thread once the watcher sees that
+	 * replica has reached the target lsn.
+	 */
+	relay_lsn_watcher_f notify;
+	/**
+	 * A callback fired once the relay thread exits to notify the waiter
+	 * there's nothing to wait for anymore.
+	 */
+	relay_lsn_watcher_f destroy;
+	/** Data passed to \a notify and \a destroy. */
+	void *data;
+	/** A fiber waiting for this watcher to fire. */
+	struct fiber *waiter;
+	/** A link in relay's watcher list. */
+	struct rlist in_list;
+};
+
+/** Initialize a pre-allocated lsn watcher. */
+void
+relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
+			 int64_t target_lsn, relay_lsn_watcher_f notify,
+			 relay_lsn_watcher_f destroy, void *data);
+
+/** Attach the given lsn watcher to the relay. */
+void
+relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher);
+
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
-- 
2.24.3 (Apple Git-128)

  parent reply	other threads:[~2020-12-10 20:55 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue Serge Petrenko
2020-12-17 21:43   ` Vladislav Shpilevoy
2020-12-21 10:18     ` Serge Petrenko
2020-12-21 17:11       ` Vladislav Shpilevoy
2020-12-23 12:01         ` Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running Serge Petrenko
2020-12-17 21:43   ` Vladislav Shpilevoy
2020-12-23 12:01     ` Serge Petrenko
2020-12-10 20:55 ` Serge Petrenko [this message]
2020-12-17 21:43   ` [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher Vladislav Shpilevoy
     [not found]     ` <4b7f4fc1-6d48-4332-c432-1eeb0b28c016@tarantool.org>
2020-12-23 12:03       ` Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything Serge Petrenko
2020-12-17 21:43   ` Vladislav Shpilevoy
2020-12-23 12:04     ` Serge Petrenko
2020-12-11  7:15 ` [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue " Serge Petrenko
2020-12-11  9:19 ` Serge Petrenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=449ae5e1cd4b4e22f55ff53a4404717c0229638a.1607633488.git.sergepetrenko@tarantool.org \
    --to=sergepetrenko@tarantool.org \
    --cc=cyrillos@gmail.com \
    --cc=tarantool-patches@dev.tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox