[Tarantool-patches] [PATCH v2 08/11] raft: relay status updates to followers

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sun Sep 20 20:17:43 MSK 2020


Hi! Consider my fixes on top of the branch for this commit.

====================

Before this patch sometimes an attempt to push a Raft update into
a relay thread could crash. That happened because relay status
being FOLLOW does not mean, that the relay thread is running, and
its endpoint is initialized. FOLLOW state is installed before the
thread is started. Until the thread is started and initialized its
endpoint, the relay->relay_pipe is garbage. An attempt to push a
message into it from relay_push_raft() led to a crash.

This patch introduces a flag relay->is_raft_enabled. When it is
false, nothing can be pushed to the relay thread. It is installed
to true by the thread itself after it initializes its endpoint.

It is worth leaving a few words why a simple cbus_call() didn't
work, called from the relay thread to call a function in the TX
thread to set the flag.

The problem with cbus_call() is that it assumes the caller
thread's scheduler fiber will call cbus_process() on each wakeup.

In the relay thread the scheduler fiber only wakes up the main
relay fiber, which in turn may call cbus_process() some time
later. So in this patch the flag setting is implemented a bit
differently. With manual calls of cbus_process() and
fiber_yield() to implement kind of 'cbus_call()', but specially
for the relay thread.

The Raft test still may crash, but in a new place, somewhere in
applier_handle_raft when touches vclock.
---
 src/box/raft.c   |  8 ++---
 src/box/relay.cc | 88 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 90 insertions(+), 6 deletions(-)

diff --git a/src/box/raft.c b/src/box/raft.c
index b3ab94bd7..7b7ef9c1c 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -955,12 +955,8 @@ raft_cfg_death_timeout(void)
 void
 raft_broadcast(const struct raft_request *req)
 {
-	replicaset_foreach(replica) {
-		if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
-		    relay_get_state(replica->relay) == RELAY_FOLLOW) {
-			relay_push_raft(replica->relay, req);
-		}
-	}
+	replicaset_foreach(replica)
+		relay_push_raft(replica->relay, req);
 }
 
 void
diff --git a/src/box/relay.cc b/src/box/relay.cc
index d63711600..096f455a1 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -146,6 +146,12 @@ struct relay {
 		alignas(CACHELINE_SIZE)
 		/** Known relay vclock. */
 		struct vclock vclock;
+		/**
+		 * True if the relay needs Raft updates. It can live fine
+		 * without sending Raft updates, if it is a relay to an
+		 * anonymous replica, for example.
+		 */
+		bool is_raft_enabled;
 	} tx;
 };
 
@@ -573,6 +579,74 @@ relay_send_heartbeat(struct relay *relay)
 	}
 }
 
+/** A message to set Raft enabled flag in TX thread from a relay thread. */
+struct relay_is_raft_enabled_msg {
+	/** Base cbus message. */
+	struct cmsg base;
+	/**
+	 * First hop - TX thread, second hop - a relay thread, to notify about
+	 * the flag being set.
+	 */
+	struct cmsg_hop route[2];
+	/** Relay pointer to set the flag in. */
+	struct relay *relay;
+	/** New flag value. */
+	bool value;
+	/** Flag to wait for the flag being set, in a relay thread. */
+	bool is_finished;
+};
+
+/** TX thread part of the Raft flag setting, first hop. */
+static void
+tx_set_is_raft_enabled(struct cmsg *base)
+{
+	struct relay_is_raft_enabled_msg *msg =
+		(struct relay_is_raft_enabled_msg *)base;
+	msg->relay->tx.is_raft_enabled = msg->value;
+}
+
+/** Relay thread part of the Raft flag setting, second hop. */
+static void
+relay_set_is_raft_enabled(struct cmsg *base)
+{
+	struct relay_is_raft_enabled_msg *msg =
+		(struct relay_is_raft_enabled_msg *)base;
+	msg->is_finished = true;
+}
+
+/**
+ * Set relay Raft enabled flag from a relay thread to be accessed by the TX
+ * thread.
+ */
+static void
+relay_send_is_raft_enabled(struct relay *relay,
+			   struct relay_is_raft_enabled_msg *msg, bool value)
+{
+	msg->route[0].f = tx_set_is_raft_enabled;
+	msg->route[0].pipe = &relay->relay_pipe;
+	msg->route[1].f = relay_set_is_raft_enabled;
+	msg->route[1].pipe = NULL;
+	msg->relay = relay;
+	msg->value = value;
+	msg->is_finished = false;
+	cmsg_init(&msg->base, msg->route);
+	cpipe_push(&relay->tx_pipe, &msg->base);
+	/*
+	 * cbus_call() can't be used, because it works only if the sender thread
+	 * is a simple cbus_process() loop. But the relay thread is not -
+	 * instead it calls cbus_process() manually when ready. And the thread
+	 * loop consists of the main fiber wakeup. So cbus_call() would just
+	 * hang, because cbus_process() wouldn't be called by the scheduler
+	 * fiber.
+	 */
+	while (!msg->is_finished) {
+		cbus_process(&relay->endpoint);
+		if (msg->is_finished)
+			break;
+		fiber_yield();
+	}
+}
+
 /**
  * A libev callback invoked when a relay client socket is ready
  * for read. This currently only happens when the client closes
@@ -593,6 +667,10 @@ relay_subscribe_f(va_list ap)
 	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
 		  &relay->relay_pipe, NULL, NULL, cbus_process);
 
+	struct relay_is_raft_enabled_msg raft_enabler;
+	if (!relay->replica->anon)
+		relay_send_is_raft_enabled(relay, &raft_enabler, true);
+
 	/*
 	 * Setup garbage collection trigger.
 	 * Not needed for anonymous replicas, since they
@@ -672,6 +750,9 @@ relay_subscribe_f(va_list ap)
 		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
 	}
 
+	if (!relay->replica->anon)
+		relay_send_is_raft_enabled(relay, &raft_enabler, false);
+
 	/*
 	 * Log the error that caused the relay to break the loop.
 	 * Don't clear the error for status reporting.
@@ -821,6 +902,13 @@ relay_raft_msg_push(struct cmsg *base)
 void
 relay_push_raft(struct relay *relay, const struct raft_request *req)
 {
+	/*
+	 * Raft updates don't stack. They are thrown away if can't be pushed
+	 * now. This is fine, as long as relay's live much longer that the
+	 * timeouts in Raft are set.
+	 */
+	if (!relay->tx.is_raft_enabled)
+		return;
 	/*
 	 * XXX: the message should be preallocated. It should
 	 * work like Kharon in IProto. Relay should have 2 raft
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list