[Tarantool-patches] [PATCH v4 13/12] replication: send accumulated Raft messages after relay start

Serge Petrenko sergepetrenko at tarantool.org
Wed Apr 21 08:59:19 MSK 2021



21.04.2021 01:31, Vladislav Shpilevoy пишет:
> Thanks for the patch!
>
> As discussed in private, there is a way to drop this flag, which I
> did in a separate commit on top of this one, see below and on the
> branch sp/gh-5445-election-fixes-review:

Yes, thanks for reviewing this!

Squashed.

> ====================
>      [tosquash] Remove flag do_restart_recovery
>
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 85f335cd7..ff43c2fc7 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -95,7 +95,6 @@ struct relay_raft_msg {
>   	struct cmsg_hop route[2];
>   	struct raft_request req;
>   	struct vclock vclock;
> -	bool do_restart_recovery;
>   	struct relay *relay;
>   };
>   
> @@ -433,6 +432,12 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
>   		relay_delete(relay);
>   	});
>   
> +	/*
> +	 * Save the first vclock as 'received'. Because firstly, it was really
> +	 * received. Secondly, recv_vclock is used by recovery restart and must
> +	 * always be valid.
> +	 */
> +	vclock_copy(&relay->recv_vclock, start_vclock);
>   	relay->r = recovery_new(wal_dir(), false, start_vclock);
>   	vclock_copy(&relay->stop_vclock, stop_vclock);
>   
> @@ -660,13 +665,12 @@ struct relay_is_raft_enabled_msg {
>   };
>   
>   static void
> -relay_push_raft_msg(struct relay *relay, bool do_restart_recovery)
> +relay_push_raft_msg(struct relay *relay)
>   {
>   	if (!relay->tx.is_raft_enabled || relay->tx.is_raft_push_sent)
>   		return;
>   	struct relay_raft_msg *msg =
>   		&relay->tx.raft_msgs[relay->tx.raft_ready_msg];
> -	msg->do_restart_recovery = do_restart_recovery;
>   	cpipe_push(&relay->relay_pipe, &msg->base);
>   	relay->tx.raft_ready_msg = (relay->tx.raft_ready_msg + 1) % 2;
>   	relay->tx.is_raft_push_sent = true;
> @@ -681,16 +685,9 @@ tx_set_is_raft_enabled(struct cmsg *base)
>   		(struct relay_is_raft_enabled_msg *)base;
>   	struct relay *relay  = msg->relay;
>   	relay->tx.is_raft_enabled = msg->value;
> -	/*
> -	 * Send saved raft message as soon as relay becomes operational.
> -	 * Do not restart recovery upon the message arrival. Recovery is
> -	 * positioned at replica_clock initially, i.e. already "restarted" and
> -	 * restarting it once again would position it at the oldest xlog
> -	 * possible, because relay reader hasn't received replica vclock yet.
> -	 */
> -	if (relay->tx.is_raft_push_pending) {
> -		relay_push_raft_msg(msg->relay, false);
> -	}
> +	/* Send saved raft message as soon as relay becomes operational. */
> +	if (relay->tx.is_raft_push_pending)
> +		relay_push_raft_msg(msg->relay);
>   }
>   
>   /** Relay thread part of the Raft flag setting, second hop. */
> @@ -901,6 +898,12 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>   	});
>   
>   	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
> +	/*
> +	 * Save the first vclock as 'received'. Because firstly, it was really
> +	 * received. Secondly, recv_vclock is used by recovery restart and must
> +	 * always be valid.
> +	 */
> +	vclock_copy(&relay->recv_vclock, replica_clock);
>   	relay->r = recovery_new(wal_dir(), false, replica_clock);
>   	vclock_copy(&relay->tx.vclock, replica_clock);
>   	relay->version_id = replica_version_id;
> @@ -1003,8 +1006,7 @@ relay_raft_msg_push(struct cmsg *base)
>   		 * would be ignored again.
>   		 */
>   		relay_send(msg->relay, &row);
> -		if (msg->req.state == RAFT_STATE_LEADER &&
> -		    msg->do_restart_recovery)
> +		if (msg->req.state == RAFT_STATE_LEADER)
>   			relay_restart_recovery(msg->relay);
>   	} catch (Exception *e) {
>   		relay_set_error(msg->relay, e);
> @@ -1018,7 +1020,7 @@ tx_raft_msg_return(struct cmsg *base)
>   	struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
>   	msg->relay->tx.is_raft_push_sent = false;
>   	if (msg->relay->tx.is_raft_push_pending)
> -		relay_push_raft_msg(msg->relay, true);
> +		relay_push_raft_msg(msg->relay);
>   }
>   
>   void
> @@ -1042,7 +1044,7 @@ relay_push_raft(struct relay *relay, const struct raft_request *req)
>   	cmsg_init(&msg->base, msg->route);
>   	msg->relay = relay;
>   	relay->tx.is_raft_push_pending = true;
> -	relay_push_raft_msg(relay, true);
> +	relay_push_raft_msg(relay);
>   }
>   
>   /** Send a single row to the client. */

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list