[Tarantool-patches] [PATCH v2 08/11] raft: relay status updates to followers

Serge Petrenko sergepetrenko at tarantool.org
Mon Sep 21 10:13:19 MSK 2020


20.09.2020 20:17, Vladislav Shpilevoy пишет:
> Hi! Consider my fixes on top of the branch for this commit.
>
> ====================
>
> Before this patch sometimes an attempt to push a Raft update into
> a relay thread could crash. That happened because relay status
> being FOLLOW does not mean, that the relay thread is running, and
> its endpoint is initialized. FOLLOW state is installed before the
> thread is started. Until the thread is started and initialized its
> endpoint, the relay->relay_pipe is garbage. An attempt to push a
> message into it from relay_push_raft() led to a crash.
>
> This patch introduces a flag relay->is_raft_enabled. When it is
> false, nothing can be pushed to the relay thread. It is installed
> to true by the thread itself after it initializes its endpoint.
>
> It is worth leaving a few words why a simple cbus_call() didn't
> work, called from the relay thread to call a function in the TX
> thread to set the flag.
>
> The problem with cbus_call() is that it assumes the caller
> thread's scheduler fiber will call cbus_process() on each wakeup.
>
> In the relay thread the scheduler fiber only wakes up the main
> relay fiber, which in turn may call cbus_process() some time
> later. So in this patch the flag setting is implemented a bit
> differently. With manual calls of cbus_process() and
> fiber_yield() to implement kind of 'cbus_call()', but specially
> for the relay thread.
>
> The Raft test still may crash, but in a new place, somewhere in
> applier_handle_raft when touches vclock.
> ---
>   src/box/raft.c   |  8 ++---
>   src/box/relay.cc | 88 ++++++++++++++++++++++++++++++++++++++++++++++++
>   2 files changed, 90 insertions(+), 6 deletions(-)
>
> diff --git a/src/box/raft.c b/src/box/raft.c
> index b3ab94bd7..7b7ef9c1c 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -955,12 +955,8 @@ raft_cfg_death_timeout(void)
>   void
>   raft_broadcast(const struct raft_request *req)
>   {
> -	replicaset_foreach(replica) {
> -		if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
> -		    relay_get_state(replica->relay) == RELAY_FOLLOW) {
> -			relay_push_raft(replica->relay, req);
> -		}
> -	}
> +	replicaset_foreach(replica)
> +		relay_push_raft(replica->relay, req);
>   }
>   
>   void
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index d63711600..096f455a1 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -146,6 +146,12 @@ struct relay {
>   		alignas(CACHELINE_SIZE)
>   		/** Known relay vclock. */
>   		struct vclock vclock;
> +		/**
> +		 * True if the relay needs Raft updates. It can live fine
> +		 * without sending Raft updates, if it is a relay to an
> +		 * anonymous replica, for example.
> +		 */
> +		bool is_raft_enabled;
>   	} tx;
>   };
>   
> @@ -573,6 +579,74 @@ relay_send_heartbeat(struct relay *relay)
>   	}
>   }
>   
> +/** A message to set Raft enabled flag in TX thread from a relay thread. */
> +struct relay_is_raft_enabled_msg {
> +	/** Base cbus message. */
> +	struct cmsg base;
> +	/**
> +	 * First hop - TX thread, second hop - a relay thread, to notify about
> +	 * the flag being set.
> +	 */
> +	struct cmsg_hop route[2];
> +	/** Relay pointer to set the flag in. */
> +	struct relay *relay;
> +	/** New flag value. */
> +	bool value;
> +	/** Flag to wait for the flag being set, in a relay thread. */
> +	bool is_finished;
> +};
> +
> +/** TX thread part of the Raft flag setting, first hop. */
> +static void
> +tx_set_is_raft_enabled(struct cmsg *base)
> +{
> +	struct relay_is_raft_enabled_msg *msg =
> +		(struct relay_is_raft_enabled_msg *)base;
> +	msg->relay->tx.is_raft_enabled = msg->value;
> +}
> +
> +/** Relay thread part of the Raft flag setting, second hop. */
> +static void
> +relay_set_is_raft_enabled(struct cmsg *base)
> +{
> +	struct relay_is_raft_enabled_msg *msg =
> +		(struct relay_is_raft_enabled_msg *)base;
> +	msg->is_finished = true;
> +}
> +
> +/**
> + * Set relay Raft enabled flag from a relay thread to be accessed by the TX
> + * thread.
> + */
> +static void
> +relay_send_is_raft_enabled(struct relay *relay,
> +			   struct relay_is_raft_enabled_msg *msg, bool value)
> +{
> +	msg->route[0].f = tx_set_is_raft_enabled;
> +	msg->route[0].pipe = &relay->relay_pipe;
> +	msg->route[1].f = relay_set_is_raft_enabled;
> +	msg->route[1].pipe = NULL;
> +	msg->relay = relay;
> +	msg->value = value;
> +	msg->is_finished = false;
> +	cmsg_init(&msg->base, msg->route);
> +	cpipe_push(&relay->tx_pipe, &msg->base);
> +	/*
> +	 * cbus_call() can't be used, because it works only if the sender thread
> +	 * is a simple cbus_process() loop. But the relay thread is not -
> +	 * instead it calls cbus_process() manually when ready. And the thread
> +	 * loop consists of the main fiber wakeup. So cbus_call() would just
> +	 * hang, because cbus_process() wouldn't be called by the scheduler
> +	 * fiber.
> +	 */
> +	while (!msg->is_finished) {
> +		cbus_process(&relay->endpoint);
> +		if (msg->is_finished)
> +			break;
> +		fiber_yield();
> +	}
> +}
> +
>   /**
>    * A libev callback invoked when a relay client socket is ready
>    * for read. This currently only happens when the client closes
> @@ -593,6 +667,10 @@ relay_subscribe_f(va_list ap)
>   	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
>   		  &relay->relay_pipe, NULL, NULL, cbus_process);
>   
> +	struct relay_is_raft_enabled_msg raft_enabler;
> +	if (!relay->replica->anon)
> +		relay_send_is_raft_enabled(relay, &raft_enabler, true);
> +
>   	/*
>   	 * Setup garbage collection trigger.
>   	 * Not needed for anonymous replicas, since they
> @@ -672,6 +750,9 @@ relay_subscribe_f(va_list ap)
>   		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
>   	}
>   
> +	if (!relay->replica->anon)
> +		relay_send_is_raft_enabled(relay, &raft_enabler, false);
> +
>   	/*
>   	 * Log the error that caused the relay to break the loop.
>   	 * Don't clear the error for status reporting.
> @@ -821,6 +902,13 @@ relay_raft_msg_push(struct cmsg *base)
>   void
>   relay_push_raft(struct relay *relay, const struct raft_request *req)
>   {
> +	/*
> +	 * Raft updates don't stack. They are thrown away if can't be pushed
> +	 * now. This is fine, as long as relay's live much longer that the
> +	 * timeouts in Raft are set.
> +	 */
> +	if (!relay->tx.is_raft_enabled)
> +		return;
>   	/*
>   	 * XXX: the message should be preallocated. It should
>   	 * work like Kharon in IProto. Relay should have 2 raft


HI! Thanks for the patch! LGTM.

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list