Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>, gorcunov@gmail.com
Cc: tarantool-patches@dev.tarantool.org
Subject: Re: [Tarantool-patches] [PATCH v4 13/12] replication: send accumulated Raft messages after relay start
Date: Mon, 19 Apr 2021 15:11:08 +0300	[thread overview]
Message-ID: <6e626b42-dddd-5ac0-3e0f-f2b92d3ac8fe@tarantool.org> (raw)
In-Reply-To: <83e7df81-078c-def7-1f73-8810676bf241@tarantool.org>



18.04.2021 19:03, Vladislav Shpilevoy пишет:
> Good job on the patch!
>
> See 4 comments below.
>
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 7be33ee31..9fdd02bc1 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -160,6 +160,16 @@ struct relay {
>>            * anonymous replica, for example.
>>            */
>>           bool is_raft_enabled;
>> +        /** Is set to true by the first Raft broadcast which comes while


Hi! Thanks for the review!

My answers are irrelevant, since I took your advice from the last
comment and reworked the commit. Anyway, here they are, and the new
patch is below.


> 1. Should be a new line after /**.

Sorry for  the misprint, fixed.

>
>> +         * the relay is not yet ready to dispatch Raft messages.
>> +         */
>> +        bool has_pending_broadcast;
>> +        /**
>> +         * A Raft broadcast which should be pushed once relay notifies
>> +         * tx it needs Raft updates. Otherwise this message would be
>> +         * lost until some new Raft event happens.
>> +         */
>> +        struct raft_request pending_broadcast;
> 2. I wouldn't call them 'broadcasts'. Relay sends a single message to
> the remote node, not to all the nodes. This is a broadcast on the raft
> level. On relay level it is just a single message to one node.

Ok, let it be `pending_raft_msg` and `has_pending_raft_msg` then.

>
>>       } tx;
>>   };
>> @@ -635,14 +649,28 @@ tx_set_is_raft_enabled(struct cmsg *base)
>>       struct relay_is_raft_enabled_msg *msg =
>>           (struct relay_is_raft_enabled_msg *)base;
>>       msg->relay->tx.is_raft_enabled = msg->value;
>> +    if (msg->relay->tx.has_pending_broadcast) {
>> +        msg->has_pending_broadcast = true;
>> +        msg->req = msg->relay->tx.pending_broadcast;
> 3. Since you will deliver the broadcast now, it is not pending
> anymore. Hence there must be msg->relay->tx.has_pending_broadcast = false
> in the end.

Yep, fixed.

>
>> +    }
>>   }
>> @@ -964,12 +1008,15 @@ void
>>   relay_push_raft(struct relay *relay, const struct raft_request *req)
>>   {
>>       /*
>> -     * Raft updates don't stack. They are thrown away if can't be pushed
>> -     * now. This is fine, as long as relay's live much longer that the
>> -     * timeouts in Raft are set.
>> +     * Remember the latest Raft update. It might be a notification that
>> +     * this node is a leader. If sometime later we find out this node needs
>> +     * Raft updates, we need to send is_leader notification.
>>        */
>> -    if (!relay->tx.is_raft_enabled)
>> +    if (!relay->tx.is_raft_enabled) {
>> +        relay->tx.has_pending_broadcast = true;
>> +        relay->tx.pending_broadcast = *req;
> 4. Vclock memory does not belong to the request. This is why below we copy
> it into the message's memory. You might need to do the same here.

Yes, indeed. Thanks!
>
>>           return;
>> +    }
>>       /*
>>        * XXX: the message should be preallocated. It should
>>        * work like Kharon in IProto. Relay should have 2 raft
> We could also fix it like described in this XXX, could we?

Yep. I didn't realise that at first. The new patch is below.

==================================

replication: send accumulated Raft messages after relay start

It may happen that a Raft leader fails to send a broadcast to the
freshly connected follower.

Here's what happens: a follower subscribes to a candidate during
on-going elections. box_process_subscribe() sends out current node's
Raft state, which's candidate. Suppose a relay from follower to
candidate is already set up. Follower immediately responds to the vote
request. This makes the candidate become leader. But candidate's relay
is not yet ready to process Raft messages, and is_leader message from
the candidate gets rejected. Once relay starts, it relays all the xlogs,
but the follower rejects all the data, because it hasn't received
is_leader notification from the candidate.

Fix this by sending the last rejected message as soon as relay starts
dispatching Raft messages.

Also, while we're at it rework relay_push_raft to use a pair of
pre-allocated raft messages instead of allocating a new one on every
raft state update.

Follow-up #5445
---
  src/box/relay.cc | 122 ++++++++++++++++++++++++++++++++---------------
  1 file changed, 83 insertions(+), 39 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 7be33ee31..85f335cd7 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -87,6 +87,19 @@ struct relay_gc_msg {
      struct vclock vclock;
  };

+/**
+ * Cbus message to push raft messages to relay.
+ */
+struct relay_raft_msg {
+    struct cmsg base;
+    struct cmsg_hop route[2];
+    struct raft_request req;
+    struct vclock vclock;
+    bool do_restart_recovery;
+    struct relay *relay;
+};
+
+
  /** State of a replication relay. */
  struct relay {
      /** The thread in which we relay data to the replica. */
@@ -160,6 +173,24 @@ struct relay {
           * anonymous replica, for example.
           */
          bool is_raft_enabled;
+        /**
+         * A pair of raft messages travelling between tx and relay
+         * threads. While one is en route, the other is ready to save
+         * the next incoming raft message.
+         */
+        struct relay_raft_msg raft_msgs[2];
+        /**
+         * Id of the raft message waiting in tx thread and ready to
+         * save Raft requests. May be either 0 or 1.
+         */
+        int raft_ready_msg;
+        /** Whether raft_ready_msg holds a saved Raft message */
+        bool is_raft_push_pending;
+        /**
+         * Whether any of the messages is en route between tx and
+         * relay.
+         */
+        bool is_raft_push_sent;
      } tx;
  };

@@ -628,13 +659,38 @@ struct relay_is_raft_enabled_msg {
      bool is_finished;
  };

+static void
+relay_push_raft_msg(struct relay *relay, bool do_restart_recovery)
+{
+    if (!relay->tx.is_raft_enabled || relay->tx.is_raft_push_sent)
+        return;
+    struct relay_raft_msg *msg =
+        &relay->tx.raft_msgs[relay->tx.raft_ready_msg];
+    msg->do_restart_recovery = do_restart_recovery;
+    cpipe_push(&relay->relay_pipe, &msg->base);
+    relay->tx.raft_ready_msg = (relay->tx.raft_ready_msg + 1) % 2;
+    relay->tx.is_raft_push_sent = true;
+    relay->tx.is_raft_push_pending = false;
+}
+
  /** TX thread part of the Raft flag setting, first hop. */
  static void
  tx_set_is_raft_enabled(struct cmsg *base)
  {
      struct relay_is_raft_enabled_msg *msg =
          (struct relay_is_raft_enabled_msg *)base;
-    msg->relay->tx.is_raft_enabled = msg->value;
+    struct relay *relay  = msg->relay;
+    relay->tx.is_raft_enabled = msg->value;
+    /*
+     * Send saved raft message as soon as relay becomes operational.
+     * Do not restart recovery upon the message arrival. Recovery is
+     * positioned at replica_clock initially, i.e. already "restarted" and
+     * restarting it once again would position it at the oldest xlog
+     * possible, because relay reader hasn't received replica vclock yet.
+     */
+    if (relay->tx.is_raft_push_pending) {
+        relay_push_raft_msg(msg->relay, false);
+    }
  }

  /** Relay thread part of the Raft flag setting, second hop. */
@@ -930,14 +986,10 @@ relay_restart_recovery(struct relay *relay)
      recover_remaining_wals(relay->r, &relay->stream, NULL, true);
  }

-struct relay_raft_msg {
-    struct cmsg base;
-    struct cmsg_hop route;
-    struct raft_request req;
-    struct vclock vclock;
-    struct relay *relay;
-};
-
+/**
+ * Send a Raft message to the peer. This is done asynchronously, out of 
scope
+ * of recover_remaining_wals loop.
+ */
  static void
  relay_raft_msg_push(struct cmsg *base)
  {
@@ -951,54 +1003,46 @@ relay_raft_msg_push(struct cmsg *base)
           * would be ignored again.
           */
          relay_send(msg->relay, &row);
-        if (msg->req.state == RAFT_STATE_LEADER)
+        if (msg->req.state == RAFT_STATE_LEADER &&
+            msg->do_restart_recovery)
              relay_restart_recovery(msg->relay);
      } catch (Exception *e) {
          relay_set_error(msg->relay, e);
          fiber_cancel(fiber());
      }
-    free(msg);
+}
+
+static void
+tx_raft_msg_return(struct cmsg *base)
+{
+    struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
+    msg->relay->tx.is_raft_push_sent = false;
+    if (msg->relay->tx.is_raft_push_pending)
+        relay_push_raft_msg(msg->relay, true);
  }

  void
  relay_push_raft(struct relay *relay, const struct raft_request *req)
  {
+    struct relay_raft_msg *msg =
+        &relay->tx.raft_msgs[relay->tx.raft_ready_msg];
      /*
-     * Raft updates don't stack. They are thrown away if can't be pushed
-     * now. This is fine, as long as relay's live much longer that the
-     * timeouts in Raft are set.
-     */
-    if (!relay->tx.is_raft_enabled)
-        return;
-    /*
-     * XXX: the message should be preallocated. It should
-     * work like Kharon in IProto. Relay should have 2 raft
-     * messages rotating. When one is sent, the other can be
-     * updated and a flag is set. When the first message is
-     * sent, the control returns to TX thread, sees the set
-     * flag, rotates the buffers, and sends it again. And so
-     * on. This is how it can work in future, with 0 heap
-     * allocations. Current solution with alloc-per-update is
-     * good enough as a start. Another option - wait until all
-     * is moved to WAL thread, where this will all happen
-     * in one thread and will be much simpler.
+     * Overwrite the request in raft_ready_msg. Only the latest raft 
request
+     * is saved.
       */
-    struct relay_raft_msg *msg =
-        (struct relay_raft_msg *)malloc(sizeof(*msg));
-    if (msg == NULL) {
-        panic("Couldn't allocate raft message");
-        return;
-    }
      msg->req = *req;
      if (req->vclock != NULL) {
          msg->req.vclock = &msg->vclock;
          vclock_copy(&msg->vclock, req->vclock);
      }
-    msg->route.f = relay_raft_msg_push;
-    msg->route.pipe = NULL;
-    cmsg_init(&msg->base, &msg->route);
+    msg->route[0].f = relay_raft_msg_push;
+    msg->route[0].pipe = &relay->tx_pipe;
+    msg->route[1].f = tx_raft_msg_return;
+    msg->route[1].pipe = NULL;
+    cmsg_init(&msg->base, msg->route);
      msg->relay = relay;
-    cpipe_push(&relay->relay_pipe, &msg->base);
+    relay->tx.is_raft_push_pending = true;
+    relay_push_raft_msg(relay, true);
  }

  /** Send a single row to the client. */
-- 
2.24.3 (Apple Git-128)


-- 
Serge Petrenko


  reply	other threads:[~2021-04-19 12:11 UTC|newest]

Thread overview: 57+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-04-16 16:25 [Tarantool-patches] [PATCH v4 00/12] raft: introduce manual elections and fix a bug with re-applying rolled back transactions Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 01/12] wal: make wal_assign_lsn accept journal entry Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 02/12] xrow: enrich row's meta information with sync replication flags Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 03/12] xrow: introduce a PROMOTE entry Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 04/12] box: actualise iproto_key_type array Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 05/12] box: make clear_synchro_queue() write a PROMOTE entry instead of CONFIRM + ROLLBACK Serge Petrenko via Tarantool-patches
2021-04-16 22:12   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-18  8:24     ` Serge Petrenko via Tarantool-patches
2021-04-20 22:30   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-21  5:58     ` Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 06/12] box: write PROMOTE even for empty limbo Serge Petrenko via Tarantool-patches
2021-04-19 13:39   ` Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 07/12] raft: filter rows based on known peer terms Serge Petrenko via Tarantool-patches
2021-04-16 22:21   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-18  8:49     ` Serge Petrenko via Tarantool-patches
2021-04-18 15:44     ` Vladislav Shpilevoy via Tarantool-patches
2021-04-19  9:31       ` Serge Petrenko via Tarantool-patches
2021-04-18 16:27   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-19  9:30     ` Serge Petrenko via Tarantool-patches
2021-04-20 20:29   ` Serge Petrenko via Tarantool-patches
2021-04-20 20:31     ` Serge Petrenko via Tarantool-patches
2021-04-20 20:55       ` Serge Petrenko via Tarantool-patches
2021-04-20 22:30       ` Vladislav Shpilevoy via Tarantool-patches
2021-04-21  5:58         ` Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 08/12] election: introduce a new election mode: "manual" Serge Petrenko via Tarantool-patches
2021-04-19 22:34   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-20  9:25     ` Serge Petrenko via Tarantool-patches
2021-04-20 17:37       ` Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 09/12] raft: introduce raft_start/stop_candidate Serge Petrenko via Tarantool-patches
2021-04-16 22:23   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-18  8:59     ` Serge Petrenko via Tarantool-patches
2021-04-19 22:35       ` Vladislav Shpilevoy via Tarantool-patches
2021-04-20  9:28         ` Serge Petrenko via Tarantool-patches
2021-04-19 12:52   ` Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 10/12] election: support manual elections in clear_synchro_queue() Serge Petrenko via Tarantool-patches
2021-04-16 22:24   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-18  9:26     ` Serge Petrenko via Tarantool-patches
2021-04-18 16:07       ` Vladislav Shpilevoy via Tarantool-patches
2021-04-19  9:32         ` Serge Petrenko via Tarantool-patches
2021-04-19 12:47   ` Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 11/12] box: remove parameter from clear_synchro_queue Serge Petrenko via Tarantool-patches
2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 12/12] box.ctl: rename clear_synchro_queue to promote Serge Petrenko via Tarantool-patches
2021-04-19 22:35   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-20 10:22     ` Serge Petrenko via Tarantool-patches
2021-04-18 12:00 ` [Tarantool-patches] [PATCH v4 13/12] replication: send accumulated Raft messages after relay start Serge Petrenko via Tarantool-patches
2021-04-18 16:03   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-19 12:11     ` Serge Petrenko via Tarantool-patches [this message]
2021-04-19 22:36       ` Vladislav Shpilevoy via Tarantool-patches
2021-04-20 10:38         ` Serge Petrenko via Tarantool-patches
2021-04-20 22:31           ` Vladislav Shpilevoy via Tarantool-patches
2021-04-21  5:59             ` Serge Petrenko via Tarantool-patches
2021-04-19 22:37 ` [Tarantool-patches] [PATCH v4 00/12] raft: introduce manual elections and fix a bug with re-applying rolled back transactions Vladislav Shpilevoy via Tarantool-patches
2021-04-20 17:38 ` [Tarantool-patches] [PATCH v4 14/12] txn: make NOPs fully asynchronous Serge Petrenko via Tarantool-patches
2021-04-20 22:31   ` Vladislav Shpilevoy via Tarantool-patches
2021-04-21  5:59     ` Serge Petrenko via Tarantool-patches
2021-04-20 22:30 ` [Tarantool-patches] [PATCH v4 00/12] raft: introduce manual elections and fix a bug with re-applying rolled back transactions Vladislav Shpilevoy via Tarantool-patches
2021-04-21  6:01   ` Serge Petrenko via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=6e626b42-dddd-5ac0-3e0f-f2b92d3ac8fe@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=sergepetrenko@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v4 13/12] replication: send accumulated Raft messages after relay start' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox