From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 556776EC5F; Mon, 19 Apr 2021 15:11:11 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 556776EC5F DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1618834271; bh=m+NCijvX9/xtwOm+4wvKSKVXr1cDNk/nNAOnjVTkwlo=; h=To:Cc:References:Date:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=q88HjI8zDE64DuDX9El7YZd3vJeEPm2kNmtlx3AGi5KKGW867J+qpucyFMnbD02KK e+YjAClwh1teFuVvl6k1ZHwvCQfnShXBR8XNFZ4dtS4jM9MuzSltPZXJXTFG57V4jE SpXgwYvKxvMo6Ry3E/hrWWduNNWKX0zsxCw9lWEE= Received: from smtp35.i.mail.ru (smtp35.i.mail.ru [94.100.177.95]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id DCCE46EC5F for ; Mon, 19 Apr 2021 15:11:09 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org DCCE46EC5F Received: by smtp35.i.mail.ru with esmtpa (envelope-from ) id 1lYSk8-0007nU-OS; Mon, 19 Apr 2021 15:11:09 +0300 To: Vladislav Shpilevoy , gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org References: <5cbaefa9-078a-b00c-2aec-75cf01f732d4@tarantool.org> <83e7df81-078c-def7-1f73-8810676bf241@tarantool.org> Message-ID: <6e626b42-dddd-5ac0-3e0f-f2b92d3ac8fe@tarantool.org> Date: Mon, 19 Apr 2021 15:11:08 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.9.1 MIME-Version: 1.0 In-Reply-To: <83e7df81-078c-def7-1f73-8810676bf241@tarantool.org> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-GB X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD92FFCB8E6708E7480BE79914FF86F9151AC38CC435EA4A654182A05F538085040ED6B98F63ABF33F724A48947BF8EE4C7E73CE238BD6187787DE2880F77224CA6 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7AA1605287C7F04D6EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006375B4C42A189C515578638F802B75D45FF914D58D5BE9E6BC1A93B80C6DEB9DEE97C6FB206A91F05B2FC1CFF8903C5BA1F98FE897113F38491B10649542A97EE81D2E47CDBA5A96583C09775C1D3CA48CF17B107DEF921CE79117882F4460429724CE54428C33FAD30A8DF7F3B2552694AC26CFBAC0749D213D2E47CDBA5A9658378DA827A17800CE7ABB305BD10C6E5099FA2833FD35BB23DF004C90652538430302FCEF25BFAB3454AD6D5ED66289B5278DA827A17800CE7C79881FF0BFD4449D32BA5DBAC0009BE395957E7521B51C20BC6067A898B09E4090A508E0FED6299176DF2183F8FC7C0C3CCF5C76ED3A6F9CD04E86FAF290E2D7E9C4E3C761E06A71DD303D21008E298D5E8D9A59859A8B6B372FE9A2E580EFC725E5C173C3A84C3BE90F13D913F449135872C767BF85DA2F004C90652538430E4A6367B16DE6309 X-C1DE0DAB: 0D63561A33F958A59FD2A882FE9A783AF386A16FB5944145951B54F1993B113CD59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA7502E6951B79FF9A3F410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34AC632F0BE69382F3B2667F10E8A3E7468112D1B7CB6318061A207D19E2F409118FCEB9582287B4BE1D7E09C32AA3244C993FD15988D20B5DC8559C624B15ADA1FE8DA44ABE2443F7FACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojocJA+pXcDul9r/oz2xb+RA== X-Mailru-Sender: 583F1D7ACE8F49BDD2846D59FC20E9F80EB1E8DBE485A9F10343FB255ED349336757DB67EBEA473E424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v4 13/12] replication: send accumulated Raft messages after relay start X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" 18.04.2021 19:03, Vladislav Shpilevoy пишет: > Good job on the patch! > > See 4 comments below. > >> diff --git a/src/box/relay.cc b/src/box/relay.cc >> index 7be33ee31..9fdd02bc1 100644 >> --- a/src/box/relay.cc >> +++ b/src/box/relay.cc >> @@ -160,6 +160,16 @@ struct relay { >>           * anonymous replica, for example. >>           */ >>          bool is_raft_enabled; >> +        /** Is set to true by the first Raft broadcast which comes while Hi! Thanks for the review! My answers are irrelevant, since I took your advice from the last comment and reworked the commit. Anyway, here they are, and the new patch is below. > 1. Should be a new line after /**. Sorry for  the misprint, fixed. > >> +         * the relay is not yet ready to dispatch Raft messages. >> +         */ >> +        bool has_pending_broadcast; >> +        /** >> +         * A Raft broadcast which should be pushed once relay notifies >> +         * tx it needs Raft updates. Otherwise this message would be >> +         * lost until some new Raft event happens. >> +         */ >> +        struct raft_request pending_broadcast; > 2. I wouldn't call them 'broadcasts'. Relay sends a single message to > the remote node, not to all the nodes. This is a broadcast on the raft > level. On relay level it is just a single message to one node. Ok, let it be `pending_raft_msg` and `has_pending_raft_msg` then. > >>      } tx; >>  }; >> @@ -635,14 +649,28 @@ tx_set_is_raft_enabled(struct cmsg *base) >>      struct relay_is_raft_enabled_msg *msg = >>          (struct relay_is_raft_enabled_msg *)base; >>      msg->relay->tx.is_raft_enabled = msg->value; >> +    if (msg->relay->tx.has_pending_broadcast) { >> +        msg->has_pending_broadcast = true; >> +        msg->req = msg->relay->tx.pending_broadcast; > 3. Since you will deliver the broadcast now, it is not pending > anymore. Hence there must be msg->relay->tx.has_pending_broadcast = false > in the end. Yep, fixed. > >> +    } >>  } >> @@ -964,12 +1008,15 @@ void >>  relay_push_raft(struct relay *relay, const struct raft_request *req) >>  { >>      /* >> -     * Raft updates don't stack. They are thrown away if can't be pushed >> -     * now. This is fine, as long as relay's live much longer that the >> -     * timeouts in Raft are set. >> +     * Remember the latest Raft update. It might be a notification that >> +     * this node is a leader. If sometime later we find out this node needs >> +     * Raft updates, we need to send is_leader notification. >>       */ >> -    if (!relay->tx.is_raft_enabled) >> +    if (!relay->tx.is_raft_enabled) { >> +        relay->tx.has_pending_broadcast = true; >> +        relay->tx.pending_broadcast = *req; > 4. Vclock memory does not belong to the request. This is why below we copy > it into the message's memory. You might need to do the same here. Yes, indeed. Thanks! > >>          return; >> +    } >>      /* >>       * XXX: the message should be preallocated. It should >>       * work like Kharon in IProto. Relay should have 2 raft > We could also fix it like described in this XXX, could we? Yep. I didn't realise that at first. The new patch is below. ================================== replication: send accumulated Raft messages after relay start It may happen that a Raft leader fails to send a broadcast to the freshly connected follower. Here's what happens: a follower subscribes to a candidate during on-going elections. box_process_subscribe() sends out current node's Raft state, which's candidate. Suppose a relay from follower to candidate is already set up. Follower immediately responds to the vote request. This makes the candidate become leader. But candidate's relay is not yet ready to process Raft messages, and is_leader message from the candidate gets rejected. Once relay starts, it relays all the xlogs, but the follower rejects all the data, because it hasn't received is_leader notification from the candidate. Fix this by sending the last rejected message as soon as relay starts dispatching Raft messages. Also, while we're at it rework relay_push_raft to use a pair of pre-allocated raft messages instead of allocating a new one on every raft state update. Follow-up #5445 ---  src/box/relay.cc | 122 ++++++++++++++++++++++++++++++++---------------  1 file changed, 83 insertions(+), 39 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index 7be33ee31..85f335cd7 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -87,6 +87,19 @@ struct relay_gc_msg {      struct vclock vclock;  }; +/** + * Cbus message to push raft messages to relay. + */ +struct relay_raft_msg { +    struct cmsg base; +    struct cmsg_hop route[2]; +    struct raft_request req; +    struct vclock vclock; +    bool do_restart_recovery; +    struct relay *relay; +}; + +  /** State of a replication relay. */  struct relay {      /** The thread in which we relay data to the replica. */ @@ -160,6 +173,24 @@ struct relay {           * anonymous replica, for example.           */          bool is_raft_enabled; +        /** +         * A pair of raft messages travelling between tx and relay +         * threads. While one is en route, the other is ready to save +         * the next incoming raft message. +         */ +        struct relay_raft_msg raft_msgs[2]; +        /** +         * Id of the raft message waiting in tx thread and ready to +         * save Raft requests. May be either 0 or 1. +         */ +        int raft_ready_msg; +        /** Whether raft_ready_msg holds a saved Raft message */ +        bool is_raft_push_pending; +        /** +         * Whether any of the messages is en route between tx and +         * relay. +         */ +        bool is_raft_push_sent;      } tx;  }; @@ -628,13 +659,38 @@ struct relay_is_raft_enabled_msg {      bool is_finished;  }; +static void +relay_push_raft_msg(struct relay *relay, bool do_restart_recovery) +{ +    if (!relay->tx.is_raft_enabled || relay->tx.is_raft_push_sent) +        return; +    struct relay_raft_msg *msg = +        &relay->tx.raft_msgs[relay->tx.raft_ready_msg]; +    msg->do_restart_recovery = do_restart_recovery; +    cpipe_push(&relay->relay_pipe, &msg->base); +    relay->tx.raft_ready_msg = (relay->tx.raft_ready_msg + 1) % 2; +    relay->tx.is_raft_push_sent = true; +    relay->tx.is_raft_push_pending = false; +} +  /** TX thread part of the Raft flag setting, first hop. */  static void  tx_set_is_raft_enabled(struct cmsg *base)  {      struct relay_is_raft_enabled_msg *msg =          (struct relay_is_raft_enabled_msg *)base; -    msg->relay->tx.is_raft_enabled = msg->value; +    struct relay *relay  = msg->relay; +    relay->tx.is_raft_enabled = msg->value; +    /* +     * Send saved raft message as soon as relay becomes operational. +     * Do not restart recovery upon the message arrival. Recovery is +     * positioned at replica_clock initially, i.e. already "restarted" and +     * restarting it once again would position it at the oldest xlog +     * possible, because relay reader hasn't received replica vclock yet. +     */ +    if (relay->tx.is_raft_push_pending) { +        relay_push_raft_msg(msg->relay, false); +    }  }  /** Relay thread part of the Raft flag setting, second hop. */ @@ -930,14 +986,10 @@ relay_restart_recovery(struct relay *relay)      recover_remaining_wals(relay->r, &relay->stream, NULL, true);  } -struct relay_raft_msg { -    struct cmsg base; -    struct cmsg_hop route; -    struct raft_request req; -    struct vclock vclock; -    struct relay *relay; -}; - +/** + * Send a Raft message to the peer. This is done asynchronously, out of scope + * of recover_remaining_wals loop. + */  static void  relay_raft_msg_push(struct cmsg *base)  { @@ -951,54 +1003,46 @@ relay_raft_msg_push(struct cmsg *base)           * would be ignored again.           */          relay_send(msg->relay, &row); -        if (msg->req.state == RAFT_STATE_LEADER) +        if (msg->req.state == RAFT_STATE_LEADER && +            msg->do_restart_recovery)              relay_restart_recovery(msg->relay);      } catch (Exception *e) {          relay_set_error(msg->relay, e);          fiber_cancel(fiber());      } -    free(msg); +} + +static void +tx_raft_msg_return(struct cmsg *base) +{ +    struct relay_raft_msg *msg = (struct relay_raft_msg *)base; +    msg->relay->tx.is_raft_push_sent = false; +    if (msg->relay->tx.is_raft_push_pending) +        relay_push_raft_msg(msg->relay, true);  }  void  relay_push_raft(struct relay *relay, const struct raft_request *req)  { +    struct relay_raft_msg *msg = +        &relay->tx.raft_msgs[relay->tx.raft_ready_msg];      /* -     * Raft updates don't stack. They are thrown away if can't be pushed -     * now. This is fine, as long as relay's live much longer that the -     * timeouts in Raft are set. -     */ -    if (!relay->tx.is_raft_enabled) -        return; -    /* -     * XXX: the message should be preallocated. It should -     * work like Kharon in IProto. Relay should have 2 raft -     * messages rotating. When one is sent, the other can be -     * updated and a flag is set. When the first message is -     * sent, the control returns to TX thread, sees the set -     * flag, rotates the buffers, and sends it again. And so -     * on. This is how it can work in future, with 0 heap -     * allocations. Current solution with alloc-per-update is -     * good enough as a start. Another option - wait until all -     * is moved to WAL thread, where this will all happen -     * in one thread and will be much simpler. +     * Overwrite the request in raft_ready_msg. Only the latest raft request +     * is saved.       */ -    struct relay_raft_msg *msg = -        (struct relay_raft_msg *)malloc(sizeof(*msg)); -    if (msg == NULL) { -        panic("Couldn't allocate raft message"); -        return; -    }      msg->req = *req;      if (req->vclock != NULL) {          msg->req.vclock = &msg->vclock;          vclock_copy(&msg->vclock, req->vclock);      } -    msg->route.f = relay_raft_msg_push; -    msg->route.pipe = NULL; -    cmsg_init(&msg->base, &msg->route); +    msg->route[0].f = relay_raft_msg_push; +    msg->route[0].pipe = &relay->tx_pipe; +    msg->route[1].f = tx_raft_msg_return; +    msg->route[1].pipe = NULL; +    cmsg_init(&msg->base, msg->route);      msg->relay = relay; -    cpipe_push(&relay->relay_pipe, &msg->base); +    relay->tx.is_raft_push_pending = true; +    relay_push_raft_msg(relay, true);  }  /** Send a single row to the client. */ -- 2.24.3 (Apple Git-128) -- Serge Petrenko