[Tarantool-patches] [PATCH v2 08/11] raft: relay status updates to followers
Serge Petrenko
sergepetrenko at tarantool.org
Mon Sep 21 10:13:19 MSK 2020
20.09.2020 20:17, Vladislav Shpilevoy пишет:
> Hi! Consider my fixes on top of the branch for this commit.
>
> ====================
>
> Before this patch sometimes an attempt to push a Raft update into
> a relay thread could crash. That happened because relay status
> being FOLLOW does not mean, that the relay thread is running, and
> its endpoint is initialized. FOLLOW state is installed before the
> thread is started. Until the thread is started and initialized its
> endpoint, the relay->relay_pipe is garbage. An attempt to push a
> message into it from relay_push_raft() led to a crash.
>
> This patch introduces a flag relay->is_raft_enabled. When it is
> false, nothing can be pushed to the relay thread. It is installed
> to true by the thread itself after it initializes its endpoint.
>
> It is worth leaving a few words why a simple cbus_call() didn't
> work, called from the relay thread to call a function in the TX
> thread to set the flag.
>
> The problem with cbus_call() is that it assumes the caller
> thread's scheduler fiber will call cbus_process() on each wakeup.
>
> In the relay thread the scheduler fiber only wakes up the main
> relay fiber, which in turn may call cbus_process() some time
> later. So in this patch the flag setting is implemented a bit
> differently. With manual calls of cbus_process() and
> fiber_yield() to implement kind of 'cbus_call()', but specially
> for the relay thread.
>
> The Raft test still may crash, but in a new place, somewhere in
> applier_handle_raft when touches vclock.
> ---
> src/box/raft.c | 8 ++---
> src/box/relay.cc | 88 ++++++++++++++++++++++++++++++++++++++++++++++++
> 2 files changed, 90 insertions(+), 6 deletions(-)
>
> diff --git a/src/box/raft.c b/src/box/raft.c
> index b3ab94bd7..7b7ef9c1c 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -955,12 +955,8 @@ raft_cfg_death_timeout(void)
> void
> raft_broadcast(const struct raft_request *req)
> {
> - replicaset_foreach(replica) {
> - if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
> - relay_get_state(replica->relay) == RELAY_FOLLOW) {
> - relay_push_raft(replica->relay, req);
> - }
> - }
> + replicaset_foreach(replica)
> + relay_push_raft(replica->relay, req);
> }
>
> void
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index d63711600..096f455a1 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -146,6 +146,12 @@ struct relay {
> alignas(CACHELINE_SIZE)
> /** Known relay vclock. */
> struct vclock vclock;
> + /**
> + * True if the relay needs Raft updates. It can live fine
> + * without sending Raft updates, if it is a relay to an
> + * anonymous replica, for example.
> + */
> + bool is_raft_enabled;
> } tx;
> };
>
> @@ -573,6 +579,74 @@ relay_send_heartbeat(struct relay *relay)
> }
> }
>
> +/** A message to set Raft enabled flag in TX thread from a relay thread. */
> +struct relay_is_raft_enabled_msg {
> + /** Base cbus message. */
> + struct cmsg base;
> + /**
> + * First hop - TX thread, second hop - a relay thread, to notify about
> + * the flag being set.
> + */
> + struct cmsg_hop route[2];
> + /** Relay pointer to set the flag in. */
> + struct relay *relay;
> + /** New flag value. */
> + bool value;
> + /** Flag to wait for the flag being set, in a relay thread. */
> + bool is_finished;
> +};
> +
> +/** TX thread part of the Raft flag setting, first hop. */
> +static void
> +tx_set_is_raft_enabled(struct cmsg *base)
> +{
> + struct relay_is_raft_enabled_msg *msg =
> + (struct relay_is_raft_enabled_msg *)base;
> + msg->relay->tx.is_raft_enabled = msg->value;
> +}
> +
> +/** Relay thread part of the Raft flag setting, second hop. */
> +static void
> +relay_set_is_raft_enabled(struct cmsg *base)
> +{
> + struct relay_is_raft_enabled_msg *msg =
> + (struct relay_is_raft_enabled_msg *)base;
> + msg->is_finished = true;
> +}
> +
> +/**
> + * Set relay Raft enabled flag from a relay thread to be accessed by the TX
> + * thread.
> + */
> +static void
> +relay_send_is_raft_enabled(struct relay *relay,
> + struct relay_is_raft_enabled_msg *msg, bool value)
> +{
> + msg->route[0].f = tx_set_is_raft_enabled;
> + msg->route[0].pipe = &relay->relay_pipe;
> + msg->route[1].f = relay_set_is_raft_enabled;
> + msg->route[1].pipe = NULL;
> + msg->relay = relay;
> + msg->value = value;
> + msg->is_finished = false;
> + cmsg_init(&msg->base, msg->route);
> + cpipe_push(&relay->tx_pipe, &msg->base);
> + /*
> + * cbus_call() can't be used, because it works only if the sender thread
> + * is a simple cbus_process() loop. But the relay thread is not -
> + * instead it calls cbus_process() manually when ready. And the thread
> + * loop consists of the main fiber wakeup. So cbus_call() would just
> + * hang, because cbus_process() wouldn't be called by the scheduler
> + * fiber.
> + */
> + while (!msg->is_finished) {
> + cbus_process(&relay->endpoint);
> + if (msg->is_finished)
> + break;
> + fiber_yield();
> + }
> +}
> +
> /**
> * A libev callback invoked when a relay client socket is ready
> * for read. This currently only happens when the client closes
> @@ -593,6 +667,10 @@ relay_subscribe_f(va_list ap)
> cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
> &relay->relay_pipe, NULL, NULL, cbus_process);
>
> + struct relay_is_raft_enabled_msg raft_enabler;
> + if (!relay->replica->anon)
> + relay_send_is_raft_enabled(relay, &raft_enabler, true);
> +
> /*
> * Setup garbage collection trigger.
> * Not needed for anonymous replicas, since they
> @@ -672,6 +750,9 @@ relay_subscribe_f(va_list ap)
> cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
> }
>
> + if (!relay->replica->anon)
> + relay_send_is_raft_enabled(relay, &raft_enabler, false);
> +
> /*
> * Log the error that caused the relay to break the loop.
> * Don't clear the error for status reporting.
> @@ -821,6 +902,13 @@ relay_raft_msg_push(struct cmsg *base)
> void
> relay_push_raft(struct relay *relay, const struct raft_request *req)
> {
> + /*
> + * Raft updates don't stack. They are thrown away if can't be pushed
> + * now. This is fine, as long as relay's live much longer that the
> + * timeouts in Raft are set.
> + */
> + if (!relay->tx.is_raft_enabled)
> + return;
> /*
> * XXX: the message should be preallocated. It should
> * work like Kharon in IProto. Relay should have 2 raft
HI! Thanks for the patch! LGTM.
--
Serge Petrenko
More information about the Tarantool-patches
mailing list