From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp18.mail.ru (smtp18.mail.ru [94.100.176.155]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 49CD1469719 for ; Mon, 21 Sep 2020 10:13:21 +0300 (MSK) References: <17979db071a67d8e5f299fd405095dc14a507b3c.1599693319.git.v.shpilevoy@tarantool.org> From: Serge Petrenko Message-ID: <1a43da05-7a47-369f-068d-66687ea34672@tarantool.org> Date: Mon, 21 Sep 2020 10:13:19 +0300 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: ru Subject: Re: [Tarantool-patches] [PATCH v2 08/11] raft: relay status updates to followers List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, gorcunov@gmail.com 20.09.2020 20:17, Vladislav Shpilevoy пишет: > Hi! Consider my fixes on top of the branch for this commit. > > ==================== > > Before this patch sometimes an attempt to push a Raft update into > a relay thread could crash. That happened because relay status > being FOLLOW does not mean, that the relay thread is running, and > its endpoint is initialized. FOLLOW state is installed before the > thread is started. Until the thread is started and initialized its > endpoint, the relay->relay_pipe is garbage. An attempt to push a > message into it from relay_push_raft() led to a crash. > > This patch introduces a flag relay->is_raft_enabled. When it is > false, nothing can be pushed to the relay thread. It is installed > to true by the thread itself after it initializes its endpoint. > > It is worth leaving a few words why a simple cbus_call() didn't > work, called from the relay thread to call a function in the TX > thread to set the flag. > > The problem with cbus_call() is that it assumes the caller > thread's scheduler fiber will call cbus_process() on each wakeup. > > In the relay thread the scheduler fiber only wakes up the main > relay fiber, which in turn may call cbus_process() some time > later. So in this patch the flag setting is implemented a bit > differently. With manual calls of cbus_process() and > fiber_yield() to implement kind of 'cbus_call()', but specially > for the relay thread. > > The Raft test still may crash, but in a new place, somewhere in > applier_handle_raft when touches vclock. > --- > src/box/raft.c | 8 ++--- > src/box/relay.cc | 88 ++++++++++++++++++++++++++++++++++++++++++++++++ > 2 files changed, 90 insertions(+), 6 deletions(-) > > diff --git a/src/box/raft.c b/src/box/raft.c > index b3ab94bd7..7b7ef9c1c 100644 > --- a/src/box/raft.c > +++ b/src/box/raft.c > @@ -955,12 +955,8 @@ raft_cfg_death_timeout(void) > void > raft_broadcast(const struct raft_request *req) > { > - replicaset_foreach(replica) { > - if (replica->relay != NULL && replica->id != REPLICA_ID_NIL && > - relay_get_state(replica->relay) == RELAY_FOLLOW) { > - relay_push_raft(replica->relay, req); > - } > - } > + replicaset_foreach(replica) > + relay_push_raft(replica->relay, req); > } > > void > diff --git a/src/box/relay.cc b/src/box/relay.cc > index d63711600..096f455a1 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -146,6 +146,12 @@ struct relay { > alignas(CACHELINE_SIZE) > /** Known relay vclock. */ > struct vclock vclock; > + /** > + * True if the relay needs Raft updates. It can live fine > + * without sending Raft updates, if it is a relay to an > + * anonymous replica, for example. > + */ > + bool is_raft_enabled; > } tx; > }; > > @@ -573,6 +579,74 @@ relay_send_heartbeat(struct relay *relay) > } > } > > +/** A message to set Raft enabled flag in TX thread from a relay thread. */ > +struct relay_is_raft_enabled_msg { > + /** Base cbus message. */ > + struct cmsg base; > + /** > + * First hop - TX thread, second hop - a relay thread, to notify about > + * the flag being set. > + */ > + struct cmsg_hop route[2]; > + /** Relay pointer to set the flag in. */ > + struct relay *relay; > + /** New flag value. */ > + bool value; > + /** Flag to wait for the flag being set, in a relay thread. */ > + bool is_finished; > +}; > + > +/** TX thread part of the Raft flag setting, first hop. */ > +static void > +tx_set_is_raft_enabled(struct cmsg *base) > +{ > + struct relay_is_raft_enabled_msg *msg = > + (struct relay_is_raft_enabled_msg *)base; > + msg->relay->tx.is_raft_enabled = msg->value; > +} > + > +/** Relay thread part of the Raft flag setting, second hop. */ > +static void > +relay_set_is_raft_enabled(struct cmsg *base) > +{ > + struct relay_is_raft_enabled_msg *msg = > + (struct relay_is_raft_enabled_msg *)base; > + msg->is_finished = true; > +} > + > +/** > + * Set relay Raft enabled flag from a relay thread to be accessed by the TX > + * thread. > + */ > +static void > +relay_send_is_raft_enabled(struct relay *relay, > + struct relay_is_raft_enabled_msg *msg, bool value) > +{ > + msg->route[0].f = tx_set_is_raft_enabled; > + msg->route[0].pipe = &relay->relay_pipe; > + msg->route[1].f = relay_set_is_raft_enabled; > + msg->route[1].pipe = NULL; > + msg->relay = relay; > + msg->value = value; > + msg->is_finished = false; > + cmsg_init(&msg->base, msg->route); > + cpipe_push(&relay->tx_pipe, &msg->base); > + /* > + * cbus_call() can't be used, because it works only if the sender thread > + * is a simple cbus_process() loop. But the relay thread is not - > + * instead it calls cbus_process() manually when ready. And the thread > + * loop consists of the main fiber wakeup. So cbus_call() would just > + * hang, because cbus_process() wouldn't be called by the scheduler > + * fiber. > + */ > + while (!msg->is_finished) { > + cbus_process(&relay->endpoint); > + if (msg->is_finished) > + break; > + fiber_yield(); > + } > +} > + > /** > * A libev callback invoked when a relay client socket is ready > * for read. This currently only happens when the client closes > @@ -593,6 +667,10 @@ relay_subscribe_f(va_list ap) > cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, > &relay->relay_pipe, NULL, NULL, cbus_process); > > + struct relay_is_raft_enabled_msg raft_enabler; > + if (!relay->replica->anon) > + relay_send_is_raft_enabled(relay, &raft_enabler, true); > + > /* > * Setup garbage collection trigger. > * Not needed for anonymous replicas, since they > @@ -672,6 +750,9 @@ relay_subscribe_f(va_list ap) > cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); > } > > + if (!relay->replica->anon) > + relay_send_is_raft_enabled(relay, &raft_enabler, false); > + > /* > * Log the error that caused the relay to break the loop. > * Don't clear the error for status reporting. > @@ -821,6 +902,13 @@ relay_raft_msg_push(struct cmsg *base) > void > relay_push_raft(struct relay *relay, const struct raft_request *req) > { > + /* > + * Raft updates don't stack. They are thrown away if can't be pushed > + * now. This is fine, as long as relay's live much longer that the > + * timeouts in Raft are set. > + */ > + if (!relay->tx.is_raft_enabled) > + return; > /* > * XXX: the message should be preallocated. It should > * work like Kharon in IProto. Relay should have 2 raft HI! Thanks for the patch! LGTM. -- Serge Petrenko