From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng2.m.smailru.net (smtpng2.m.smailru.net [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 29EAD469719 for ; Sun, 20 Sep 2020 20:17:45 +0300 (MSK) From: Vladislav Shpilevoy References: <17979db071a67d8e5f299fd405095dc14a507b3c.1599693319.git.v.shpilevoy@tarantool.org> Message-ID: Date: Sun, 20 Sep 2020 19:17:43 +0200 MIME-Version: 1.0 In-Reply-To: <17979db071a67d8e5f299fd405095dc14a507b3c.1599693319.git.v.shpilevoy@tarantool.org> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH v2 08/11] raft: relay status updates to followers List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org, gorcunov@gmail.com Hi! Consider my fixes on top of the branch for this commit. ==================== Before this patch sometimes an attempt to push a Raft update into a relay thread could crash. That happened because relay status being FOLLOW does not mean, that the relay thread is running, and its endpoint is initialized. FOLLOW state is installed before the thread is started. Until the thread is started and initialized its endpoint, the relay->relay_pipe is garbage. An attempt to push a message into it from relay_push_raft() led to a crash. This patch introduces a flag relay->is_raft_enabled. When it is false, nothing can be pushed to the relay thread. It is installed to true by the thread itself after it initializes its endpoint. It is worth leaving a few words why a simple cbus_call() didn't work, called from the relay thread to call a function in the TX thread to set the flag. The problem with cbus_call() is that it assumes the caller thread's scheduler fiber will call cbus_process() on each wakeup. In the relay thread the scheduler fiber only wakes up the main relay fiber, which in turn may call cbus_process() some time later. So in this patch the flag setting is implemented a bit differently. With manual calls of cbus_process() and fiber_yield() to implement kind of 'cbus_call()', but specially for the relay thread. The Raft test still may crash, but in a new place, somewhere in applier_handle_raft when touches vclock. --- src/box/raft.c | 8 ++--- src/box/relay.cc | 88 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/src/box/raft.c b/src/box/raft.c index b3ab94bd7..7b7ef9c1c 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -955,12 +955,8 @@ raft_cfg_death_timeout(void) void raft_broadcast(const struct raft_request *req) { - replicaset_foreach(replica) { - if (replica->relay != NULL && replica->id != REPLICA_ID_NIL && - relay_get_state(replica->relay) == RELAY_FOLLOW) { - relay_push_raft(replica->relay, req); - } - } + replicaset_foreach(replica) + relay_push_raft(replica->relay, req); } void diff --git a/src/box/relay.cc b/src/box/relay.cc index d63711600..096f455a1 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -146,6 +146,12 @@ struct relay { alignas(CACHELINE_SIZE) /** Known relay vclock. */ struct vclock vclock; + /** + * True if the relay needs Raft updates. It can live fine + * without sending Raft updates, if it is a relay to an + * anonymous replica, for example. + */ + bool is_raft_enabled; } tx; }; @@ -573,6 +579,74 @@ relay_send_heartbeat(struct relay *relay) } } +/** A message to set Raft enabled flag in TX thread from a relay thread. */ +struct relay_is_raft_enabled_msg { + /** Base cbus message. */ + struct cmsg base; + /** + * First hop - TX thread, second hop - a relay thread, to notify about + * the flag being set. + */ + struct cmsg_hop route[2]; + /** Relay pointer to set the flag in. */ + struct relay *relay; + /** New flag value. */ + bool value; + /** Flag to wait for the flag being set, in a relay thread. */ + bool is_finished; +}; + +/** TX thread part of the Raft flag setting, first hop. */ +static void +tx_set_is_raft_enabled(struct cmsg *base) +{ + struct relay_is_raft_enabled_msg *msg = + (struct relay_is_raft_enabled_msg *)base; + msg->relay->tx.is_raft_enabled = msg->value; +} + +/** Relay thread part of the Raft flag setting, second hop. */ +static void +relay_set_is_raft_enabled(struct cmsg *base) +{ + struct relay_is_raft_enabled_msg *msg = + (struct relay_is_raft_enabled_msg *)base; + msg->is_finished = true; +} + +/** + * Set relay Raft enabled flag from a relay thread to be accessed by the TX + * thread. + */ +static void +relay_send_is_raft_enabled(struct relay *relay, + struct relay_is_raft_enabled_msg *msg, bool value) +{ + msg->route[0].f = tx_set_is_raft_enabled; + msg->route[0].pipe = &relay->relay_pipe; + msg->route[1].f = relay_set_is_raft_enabled; + msg->route[1].pipe = NULL; + msg->relay = relay; + msg->value = value; + msg->is_finished = false; + cmsg_init(&msg->base, msg->route); + cpipe_push(&relay->tx_pipe, &msg->base); + /* + * cbus_call() can't be used, because it works only if the sender thread + * is a simple cbus_process() loop. But the relay thread is not - + * instead it calls cbus_process() manually when ready. And the thread + * loop consists of the main fiber wakeup. So cbus_call() would just + * hang, because cbus_process() wouldn't be called by the scheduler + * fiber. + */ + while (!msg->is_finished) { + cbus_process(&relay->endpoint); + if (msg->is_finished) + break; + fiber_yield(); + } +} + /** * A libev callback invoked when a relay client socket is ready * for read. This currently only happens when the client closes @@ -593,6 +667,10 @@ relay_subscribe_f(va_list ap) cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); + struct relay_is_raft_enabled_msg raft_enabler; + if (!relay->replica->anon) + relay_send_is_raft_enabled(relay, &raft_enabler, true); + /* * Setup garbage collection trigger. * Not needed for anonymous replicas, since they @@ -672,6 +750,9 @@ relay_subscribe_f(va_list ap) cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); } + if (!relay->replica->anon) + relay_send_is_raft_enabled(relay, &raft_enabler, false); + /* * Log the error that caused the relay to break the loop. * Don't clear the error for status reporting. @@ -821,6 +902,13 @@ relay_raft_msg_push(struct cmsg *base) void relay_push_raft(struct relay *relay, const struct raft_request *req) { + /* + * Raft updates don't stack. They are thrown away if can't be pushed + * now. This is fine, as long as relay's live much longer that the + * timeouts in Raft are set. + */ + if (!relay->tx.is_raft_enabled) + return; /* * XXX: the message should be preallocated. It should * work like Kharon in IProto. Relay should have 2 raft -- 2.21.1 (Apple Git-122.3)