[Tarantool-patches] [PATCH v4 13/12] replication: send accumulated Raft messages after relay start

Serge Petrenko sergepetrenko at tarantool.org
Mon Apr 19 15:11:08 MSK 2021



18.04.2021 19:03, Vladislav Shpilevoy пишет:
> Good job on the patch!
>
> See 4 comments below.
>
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 7be33ee31..9fdd02bc1 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -160,6 +160,16 @@ struct relay {
>>            * anonymous replica, for example.
>>            */
>>           bool is_raft_enabled;
>> +        /** Is set to true by the first Raft broadcast which comes while


Hi! Thanks for the review!

My answers are irrelevant, since I took your advice from the last
comment and reworked the commit. Anyway, here they are, and the new
patch is below.


> 1. Should be a new line after /**.

Sorry for  the misprint, fixed.

>
>> +         * the relay is not yet ready to dispatch Raft messages.
>> +         */
>> +        bool has_pending_broadcast;
>> +        /**
>> +         * A Raft broadcast which should be pushed once relay notifies
>> +         * tx it needs Raft updates. Otherwise this message would be
>> +         * lost until some new Raft event happens.
>> +         */
>> +        struct raft_request pending_broadcast;
> 2. I wouldn't call them 'broadcasts'. Relay sends a single message to
> the remote node, not to all the nodes. This is a broadcast on the raft
> level. On relay level it is just a single message to one node.

Ok, let it be `pending_raft_msg` and `has_pending_raft_msg` then.

>
>>       } tx;
>>   };
>> @@ -635,14 +649,28 @@ tx_set_is_raft_enabled(struct cmsg *base)
>>       struct relay_is_raft_enabled_msg *msg =
>>           (struct relay_is_raft_enabled_msg *)base;
>>       msg->relay->tx.is_raft_enabled = msg->value;
>> +    if (msg->relay->tx.has_pending_broadcast) {
>> +        msg->has_pending_broadcast = true;
>> +        msg->req = msg->relay->tx.pending_broadcast;
> 3. Since you will deliver the broadcast now, it is not pending
> anymore. Hence there must be msg->relay->tx.has_pending_broadcast = false
> in the end.

Yep, fixed.

>
>> +    }
>>   }
>> @@ -964,12 +1008,15 @@ void
>>   relay_push_raft(struct relay *relay, const struct raft_request *req)
>>   {
>>       /*
>> -     * Raft updates don't stack. They are thrown away if can't be pushed
>> -     * now. This is fine, as long as relay's live much longer that the
>> -     * timeouts in Raft are set.
>> +     * Remember the latest Raft update. It might be a notification that
>> +     * this node is a leader. If sometime later we find out this node needs
>> +     * Raft updates, we need to send is_leader notification.
>>        */
>> -    if (!relay->tx.is_raft_enabled)
>> +    if (!relay->tx.is_raft_enabled) {
>> +        relay->tx.has_pending_broadcast = true;
>> +        relay->tx.pending_broadcast = *req;
> 4. Vclock memory does not belong to the request. This is why below we copy
> it into the message's memory. You might need to do the same here.

Yes, indeed. Thanks!
>
>>           return;
>> +    }
>>       /*
>>        * XXX: the message should be preallocated. It should
>>        * work like Kharon in IProto. Relay should have 2 raft
> We could also fix it like described in this XXX, could we?

Yep. I didn't realise that at first. The new patch is below.

==================================

replication: send accumulated Raft messages after relay start

It may happen that a Raft leader fails to send a broadcast to the
freshly connected follower.

Here's what happens: a follower subscribes to a candidate during
on-going elections. box_process_subscribe() sends out current node's
Raft state, which's candidate. Suppose a relay from follower to
candidate is already set up. Follower immediately responds to the vote
request. This makes the candidate become leader. But candidate's relay
is not yet ready to process Raft messages, and is_leader message from
the candidate gets rejected. Once relay starts, it relays all the xlogs,
but the follower rejects all the data, because it hasn't received
is_leader notification from the candidate.

Fix this by sending the last rejected message as soon as relay starts
dispatching Raft messages.

Also, while we're at it rework relay_push_raft to use a pair of
pre-allocated raft messages instead of allocating a new one on every
raft state update.

Follow-up #5445
---
  src/box/relay.cc | 122 ++++++++++++++++++++++++++++++++---------------
  1 file changed, 83 insertions(+), 39 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 7be33ee31..85f335cd7 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -87,6 +87,19 @@ struct relay_gc_msg {
      struct vclock vclock;
  };

+/**
+ * Cbus message to push raft messages to relay.
+ */
+struct relay_raft_msg {
+    struct cmsg base;
+    struct cmsg_hop route[2];
+    struct raft_request req;
+    struct vclock vclock;
+    bool do_restart_recovery;
+    struct relay *relay;
+};
+
+
  /** State of a replication relay. */
  struct relay {
      /** The thread in which we relay data to the replica. */
@@ -160,6 +173,24 @@ struct relay {
           * anonymous replica, for example.
           */
          bool is_raft_enabled;
+        /**
+         * A pair of raft messages travelling between tx and relay
+         * threads. While one is en route, the other is ready to save
+         * the next incoming raft message.
+         */
+        struct relay_raft_msg raft_msgs[2];
+        /**
+         * Id of the raft message waiting in tx thread and ready to
+         * save Raft requests. May be either 0 or 1.
+         */
+        int raft_ready_msg;
+        /** Whether raft_ready_msg holds a saved Raft message */
+        bool is_raft_push_pending;
+        /**
+         * Whether any of the messages is en route between tx and
+         * relay.
+         */
+        bool is_raft_push_sent;
      } tx;
  };

@@ -628,13 +659,38 @@ struct relay_is_raft_enabled_msg {
      bool is_finished;
  };

+static void
+relay_push_raft_msg(struct relay *relay, bool do_restart_recovery)
+{
+    if (!relay->tx.is_raft_enabled || relay->tx.is_raft_push_sent)
+        return;
+    struct relay_raft_msg *msg =
+        &relay->tx.raft_msgs[relay->tx.raft_ready_msg];
+    msg->do_restart_recovery = do_restart_recovery;
+    cpipe_push(&relay->relay_pipe, &msg->base);
+    relay->tx.raft_ready_msg = (relay->tx.raft_ready_msg + 1) % 2;
+    relay->tx.is_raft_push_sent = true;
+    relay->tx.is_raft_push_pending = false;
+}
+
  /** TX thread part of the Raft flag setting, first hop. */
  static void
  tx_set_is_raft_enabled(struct cmsg *base)
  {
      struct relay_is_raft_enabled_msg *msg =
          (struct relay_is_raft_enabled_msg *)base;
-    msg->relay->tx.is_raft_enabled = msg->value;
+    struct relay *relay  = msg->relay;
+    relay->tx.is_raft_enabled = msg->value;
+    /*
+     * Send saved raft message as soon as relay becomes operational.
+     * Do not restart recovery upon the message arrival. Recovery is
+     * positioned at replica_clock initially, i.e. already "restarted" and
+     * restarting it once again would position it at the oldest xlog
+     * possible, because relay reader hasn't received replica vclock yet.
+     */
+    if (relay->tx.is_raft_push_pending) {
+        relay_push_raft_msg(msg->relay, false);
+    }
  }

  /** Relay thread part of the Raft flag setting, second hop. */
@@ -930,14 +986,10 @@ relay_restart_recovery(struct relay *relay)
      recover_remaining_wals(relay->r, &relay->stream, NULL, true);
  }

-struct relay_raft_msg {
-    struct cmsg base;
-    struct cmsg_hop route;
-    struct raft_request req;
-    struct vclock vclock;
-    struct relay *relay;
-};
-
+/**
+ * Send a Raft message to the peer. This is done asynchronously, out of 
scope
+ * of recover_remaining_wals loop.
+ */
  static void
  relay_raft_msg_push(struct cmsg *base)
  {
@@ -951,54 +1003,46 @@ relay_raft_msg_push(struct cmsg *base)
           * would be ignored again.
           */
          relay_send(msg->relay, &row);
-        if (msg->req.state == RAFT_STATE_LEADER)
+        if (msg->req.state == RAFT_STATE_LEADER &&
+            msg->do_restart_recovery)
              relay_restart_recovery(msg->relay);
      } catch (Exception *e) {
          relay_set_error(msg->relay, e);
          fiber_cancel(fiber());
      }
-    free(msg);
+}
+
+static void
+tx_raft_msg_return(struct cmsg *base)
+{
+    struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
+    msg->relay->tx.is_raft_push_sent = false;
+    if (msg->relay->tx.is_raft_push_pending)
+        relay_push_raft_msg(msg->relay, true);
  }

  void
  relay_push_raft(struct relay *relay, const struct raft_request *req)
  {
+    struct relay_raft_msg *msg =
+        &relay->tx.raft_msgs[relay->tx.raft_ready_msg];
      /*
-     * Raft updates don't stack. They are thrown away if can't be pushed
-     * now. This is fine, as long as relay's live much longer that the
-     * timeouts in Raft are set.
-     */
-    if (!relay->tx.is_raft_enabled)
-        return;
-    /*
-     * XXX: the message should be preallocated. It should
-     * work like Kharon in IProto. Relay should have 2 raft
-     * messages rotating. When one is sent, the other can be
-     * updated and a flag is set. When the first message is
-     * sent, the control returns to TX thread, sees the set
-     * flag, rotates the buffers, and sends it again. And so
-     * on. This is how it can work in future, with 0 heap
-     * allocations. Current solution with alloc-per-update is
-     * good enough as a start. Another option - wait until all
-     * is moved to WAL thread, where this will all happen
-     * in one thread and will be much simpler.
+     * Overwrite the request in raft_ready_msg. Only the latest raft 
request
+     * is saved.
       */
-    struct relay_raft_msg *msg =
-        (struct relay_raft_msg *)malloc(sizeof(*msg));
-    if (msg == NULL) {
-        panic("Couldn't allocate raft message");
-        return;
-    }
      msg->req = *req;
      if (req->vclock != NULL) {
          msg->req.vclock = &msg->vclock;
          vclock_copy(&msg->vclock, req->vclock);
      }
-    msg->route.f = relay_raft_msg_push;
-    msg->route.pipe = NULL;
-    cmsg_init(&msg->base, &msg->route);
+    msg->route[0].f = relay_raft_msg_push;
+    msg->route[0].pipe = &relay->tx_pipe;
+    msg->route[1].f = tx_raft_msg_return;
+    msg->route[1].pipe = NULL;
+    cmsg_init(&msg->base, msg->route);
      msg->relay = relay;
-    cpipe_push(&relay->relay_pipe, &msg->base);
+    relay->tx.is_raft_push_pending = true;
+    relay_push_raft_msg(relay, true);
  }

  /** Send a single row to the client. */
-- 
2.24.3 (Apple Git-128)


-- 
Serge Petrenko



More information about the Tarantool-patches mailing list