[tarantool-patches] [PATCH] relay: send heartbeats while skipping WALs already applied by replica

Roman Tokarev dmarc-noreply at freelists.org
Thu Sep 5 16:32:13 MSK 2019


Currently, relay doesn't send heartbeat messages while skipping rows
already applied by a replica, because heartbeats are sending in the same
thread as relay reads WALs.

To allow heartbeats being sent this patch introduce the 'heartbeater' fiber
that sends them if no rows were sent to a replica during replication timeout.
To allow this fiber to be executed, the main fiber, that reads WALs yields
after skipped 10K rows.

Fixes #4461
---
 src/box/box.cc      |  3 ++
 src/box/recovery.cc | 34 +++++++++--------
 src/box/relay.cc    | 91 ++++++++++++++++++++++++++++++++-------------
 3 files changed, 88 insertions(+), 40 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ac10c21ad..c779044d5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 static void
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
+	if (row == NULL)
+		return;
+
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index d122d618a..3de5d7053 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 		if (stop_vclock != NULL &&
 		    r->vclock.signature >= stop_vclock->signature)
 			return;
+		int rc;
 		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
 		if (row.lsn <= current_lsn)
-			continue; /* already applied, skip */
-
-		/*
-		 * All rows in xlog files have an assigned
-		 * replica id.
-		 */
-		assert(row.replica_id != 0);
-		/*
-		 * We can promote the vclock either before or
-		 * after xstream_write(): it only makes any impact
-		 * in case of forced recovery, when we skip the
-		 * failed row anyway.
-		 */
-		vclock_follow_xrow(&r->vclock, &row);
-		if (xstream_write(stream, &row) == 0) {
+			/* already applied, skip */
+			rc = xstream_write(stream, NULL);
+		else {
+			/*
+			 * All rows in xlog files have an assigned
+			 * replica id.
+			 */
+			assert(row.replica_id != 0);
+			/*
+			 * We can promote the vclock either before or
+			 * after xstream_write(): it only makes any impact
+			 * in case of forced recovery, when we skip the
+			 * failed row anyway.
+			 */
+			vclock_follow_xrow(&r->vclock, &row);
+			rc = xstream_write(stream, &row);
+		}
+		if (rc == 0) {
 			++row_count;
 			if (row_count % 100000 == 0)
 				say_info("%.1fM rows processed",
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a19abf6a9..baf694bd3 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -128,6 +128,8 @@ struct relay {
 	 * confirmation from the replica.
 	 */
 	struct stailq pending_gc;
+	/** Number or rows were processed. */
+	uint64_t processed;
 	/** Time when last row was sent to peer. */
 	double last_row_time;
 	/** Relay sync state. */
@@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay)
 {
 	struct xrow_header row;
 	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
+	relay_send(relay, &row);
+}
+
+/*
+ * Relay heartbeater fiber function.
+ * Send heartbeats to replica if no xrow has been sent during
+ *  replication_timeout.
+ */
+int
+relay_heartbeater_f(va_list ap)
+{
+	struct relay *relay = va_arg(ap, struct relay *);
+	struct fiber *relay_f = va_arg(ap, struct fiber *);
+
 	try {
-		relay_send(relay, &row);
+		/*
+		 * If the replica happens to be up to date on subscribe,
+		 * don't wait for timeout to happen - send a heartbeat
+		 * message right away to update the replication lag as
+		 * soon as possible.
+		 */
+		relay_send_heartbeat(relay);
+
+		while (!fiber_is_cancelled()) {
+			double timeout = replication_timeout;
+			struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+						    ERRINJ_DOUBLE);
+			if (inj != NULL && inj->dparam != 0)
+				timeout = inj->dparam;
+
+			/* Check for a heartbeat timeout. */
+			if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
+				relay_send_heartbeat(relay);
+
+			fiber_sleep(timeout);
+		}
 	} catch (Exception *e) {
 		relay_set_error(relay, e);
-		fiber_cancel(fiber());
+		fiber_cancel(relay_f);
 	}
+
+	return 0;
 }
 
 /**
@@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap)
 	};
 	trigger_add(&r->on_close_log, &on_close_log);
 
-	/* Setup WAL watcher for sending new rows to the replica. */
-	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
-			relay_process_wal_event, cbus_process);
-
 	/* Start fiber for receiving replica acks. */
 	char name[FIBER_NAME_MAX];
 	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
@@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap)
 	fiber_set_joinable(reader, true);
 	fiber_start(reader, relay, fiber());
 
-	/*
-	 * If the replica happens to be up to date on subscribe,
-	 * don't wait for timeout to happen - send a heartbeat
-	 * message right away to update the replication lag as
-	 * soon as possible.
-	 */
-	relay_send_heartbeat(relay);
+	/* Start fiber for sending heartbeats to replica. */
+	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater");
+	struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f);
+	fiber_set_joinable(heartbeater, true);
+	fiber_start(heartbeater, relay, fiber());
+
+	/* Setup WAL watcher for sending new rows to the replica. */
+	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
+			relay_process_wal_event, cbus_process);
 
 	/*
 	 * Run the event loop until the connection is broken
 	 * or an error occurs.
 	 */
 	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
-		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
-					    ERRINJ_DOUBLE);
-		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
-
-		fiber_cond_wait_deadline(&relay->reader_cond,
-					 relay->last_row_time + timeout);
+		fiber_cond_wait(&relay->reader_cond);
 
 		/*
 		 * The fiber can be woken by IO cancel, by a timeout of
@@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap)
 		 * Handle cbus messages first.
 		 */
 		cbus_process(&relay->endpoint);
-		/* Check for a heartbeat timeout. */
-		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
-			relay_send_heartbeat(relay);
 		/*
 		 * Check that the vclock has been updated and the previous
 		 * status message is delivered
@@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap)
 	trigger_clear(&on_close_log);
 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
 
-	/* Join ack reader fiber. */
+	/* Join ack reader & heartbeater fibers. */
 	fiber_cancel(reader);
+	fiber_cancel(heartbeater);
 	fiber_join(reader);
+	fiber_join(heartbeater);
 
 	/* Destroy cpipe to tx. */
 	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
@@ -716,6 +744,19 @@ static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
+
+	relay->processed++;
+	if (packet == NULL) {
+		if (relay->processed % 10000 != 0)
+			return;
+
+		ev_now_update(loop());
+		if (ev_monotonic_now(loop()) - relay->last_row_time > replication_timeout)
+			fiber_yield_timeout(0);
+
+		return;
+	}
+
 	assert(iproto_type_is_dml(packet->type));
 	/*
 	 * Transform replica local requests to IPROTO_NOP so as to
-- 
2.20.1





More information about the Tarantool-patches mailing list