[Tarantool-patches] [PATCH v4 13/12] replication: send accumulated Raft messages after relay start
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Apr 21 01:31:02 MSK 2021
Thanks for the patch!
As discussed in private, there is a way to drop this flag, which I
did in a separate commit on top of this one, see below and on the
branch sp/gh-5445-election-fixes-review:
====================
[tosquash] Remove flag do_restart_recovery
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 85f335cd7..ff43c2fc7 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -95,7 +95,6 @@ struct relay_raft_msg {
struct cmsg_hop route[2];
struct raft_request req;
struct vclock vclock;
- bool do_restart_recovery;
struct relay *relay;
};
@@ -433,6 +432,12 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
relay_delete(relay);
});
+ /*
+ * Save the first vclock as 'received'. Because firstly, it was really
+ * received. Secondly, recv_vclock is used by recovery restart and must
+ * always be valid.
+ */
+ vclock_copy(&relay->recv_vclock, start_vclock);
relay->r = recovery_new(wal_dir(), false, start_vclock);
vclock_copy(&relay->stop_vclock, stop_vclock);
@@ -660,13 +665,12 @@ struct relay_is_raft_enabled_msg {
};
static void
-relay_push_raft_msg(struct relay *relay, bool do_restart_recovery)
+relay_push_raft_msg(struct relay *relay)
{
if (!relay->tx.is_raft_enabled || relay->tx.is_raft_push_sent)
return;
struct relay_raft_msg *msg =
&relay->tx.raft_msgs[relay->tx.raft_ready_msg];
- msg->do_restart_recovery = do_restart_recovery;
cpipe_push(&relay->relay_pipe, &msg->base);
relay->tx.raft_ready_msg = (relay->tx.raft_ready_msg + 1) % 2;
relay->tx.is_raft_push_sent = true;
@@ -681,16 +685,9 @@ tx_set_is_raft_enabled(struct cmsg *base)
(struct relay_is_raft_enabled_msg *)base;
struct relay *relay = msg->relay;
relay->tx.is_raft_enabled = msg->value;
- /*
- * Send saved raft message as soon as relay becomes operational.
- * Do not restart recovery upon the message arrival. Recovery is
- * positioned at replica_clock initially, i.e. already "restarted" and
- * restarting it once again would position it at the oldest xlog
- * possible, because relay reader hasn't received replica vclock yet.
- */
- if (relay->tx.is_raft_push_pending) {
- relay_push_raft_msg(msg->relay, false);
- }
+ /* Send saved raft message as soon as relay becomes operational. */
+ if (relay->tx.is_raft_push_pending)
+ relay_push_raft_msg(msg->relay);
}
/** Relay thread part of the Raft flag setting, second hop. */
@@ -901,6 +898,12 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
});
vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
+ /*
+ * Save the first vclock as 'received'. Because firstly, it was really
+ * received. Secondly, recv_vclock is used by recovery restart and must
+ * always be valid.
+ */
+ vclock_copy(&relay->recv_vclock, replica_clock);
relay->r = recovery_new(wal_dir(), false, replica_clock);
vclock_copy(&relay->tx.vclock, replica_clock);
relay->version_id = replica_version_id;
@@ -1003,8 +1006,7 @@ relay_raft_msg_push(struct cmsg *base)
* would be ignored again.
*/
relay_send(msg->relay, &row);
- if (msg->req.state == RAFT_STATE_LEADER &&
- msg->do_restart_recovery)
+ if (msg->req.state == RAFT_STATE_LEADER)
relay_restart_recovery(msg->relay);
} catch (Exception *e) {
relay_set_error(msg->relay, e);
@@ -1018,7 +1020,7 @@ tx_raft_msg_return(struct cmsg *base)
struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
msg->relay->tx.is_raft_push_sent = false;
if (msg->relay->tx.is_raft_push_pending)
- relay_push_raft_msg(msg->relay, true);
+ relay_push_raft_msg(msg->relay);
}
void
@@ -1042,7 +1044,7 @@ relay_push_raft(struct relay *relay, const struct raft_request *req)
cmsg_init(&msg->base, msg->route);
msg->relay = relay;
relay->tx.is_raft_push_pending = true;
- relay_push_raft_msg(relay, true);
+ relay_push_raft_msg(relay);
}
/** Send a single row to the client. */
More information about the Tarantool-patches
mailing list