Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH] relay: send heartbeats while skipping WALs already applied by replica
@ 2019-09-05 13:32 Roman Tokarev
  2019-09-06  7:20 ` [tarantool-patches] " Georgy Kirichenko
  0 siblings, 1 reply; 3+ messages in thread
From: Roman Tokarev @ 2019-09-05 13:32 UTC (permalink / raw)
  To: tarantool-patches

Currently, relay doesn't send heartbeat messages while skipping rows
already applied by a replica, because heartbeats are sending in the same
thread as relay reads WALs.

To allow heartbeats being sent this patch introduce the 'heartbeater' fiber
that sends them if no rows were sent to a replica during replication timeout.
To allow this fiber to be executed, the main fiber, that reads WALs yields
after skipped 10K rows.

Fixes #4461
---
 src/box/box.cc      |  3 ++
 src/box/recovery.cc | 34 +++++++++--------
 src/box/relay.cc    | 91 ++++++++++++++++++++++++++++++++-------------
 3 files changed, 88 insertions(+), 40 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ac10c21ad..c779044d5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 static void
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
+	if (row == NULL)
+		return;
+
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index d122d618a..3de5d7053 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 		if (stop_vclock != NULL &&
 		    r->vclock.signature >= stop_vclock->signature)
 			return;
+		int rc;
 		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
 		if (row.lsn <= current_lsn)
-			continue; /* already applied, skip */
-
-		/*
-		 * All rows in xlog files have an assigned
-		 * replica id.
-		 */
-		assert(row.replica_id != 0);
-		/*
-		 * We can promote the vclock either before or
-		 * after xstream_write(): it only makes any impact
-		 * in case of forced recovery, when we skip the
-		 * failed row anyway.
-		 */
-		vclock_follow_xrow(&r->vclock, &row);
-		if (xstream_write(stream, &row) == 0) {
+			/* already applied, skip */
+			rc = xstream_write(stream, NULL);
+		else {
+			/*
+			 * All rows in xlog files have an assigned
+			 * replica id.
+			 */
+			assert(row.replica_id != 0);
+			/*
+			 * We can promote the vclock either before or
+			 * after xstream_write(): it only makes any impact
+			 * in case of forced recovery, when we skip the
+			 * failed row anyway.
+			 */
+			vclock_follow_xrow(&r->vclock, &row);
+			rc = xstream_write(stream, &row);
+		}
+		if (rc == 0) {
 			++row_count;
 			if (row_count % 100000 == 0)
 				say_info("%.1fM rows processed",
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a19abf6a9..baf694bd3 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -128,6 +128,8 @@ struct relay {
 	 * confirmation from the replica.
 	 */
 	struct stailq pending_gc;
+	/** Number or rows were processed. */
+	uint64_t processed;
 	/** Time when last row was sent to peer. */
 	double last_row_time;
 	/** Relay sync state. */
@@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay)
 {
 	struct xrow_header row;
 	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
+	relay_send(relay, &row);
+}
+
+/*
+ * Relay heartbeater fiber function.
+ * Send heartbeats to replica if no xrow has been sent during
+ *  replication_timeout.
+ */
+int
+relay_heartbeater_f(va_list ap)
+{
+	struct relay *relay = va_arg(ap, struct relay *);
+	struct fiber *relay_f = va_arg(ap, struct fiber *);
+
 	try {
-		relay_send(relay, &row);
+		/*
+		 * If the replica happens to be up to date on subscribe,
+		 * don't wait for timeout to happen - send a heartbeat
+		 * message right away to update the replication lag as
+		 * soon as possible.
+		 */
+		relay_send_heartbeat(relay);
+
+		while (!fiber_is_cancelled()) {
+			double timeout = replication_timeout;
+			struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+						    ERRINJ_DOUBLE);
+			if (inj != NULL && inj->dparam != 0)
+				timeout = inj->dparam;
+
+			/* Check for a heartbeat timeout. */
+			if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
+				relay_send_heartbeat(relay);
+
+			fiber_sleep(timeout);
+		}
 	} catch (Exception *e) {
 		relay_set_error(relay, e);
-		fiber_cancel(fiber());
+		fiber_cancel(relay_f);
 	}
+
+	return 0;
 }
 
 /**
@@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap)
 	};
 	trigger_add(&r->on_close_log, &on_close_log);
 
-	/* Setup WAL watcher for sending new rows to the replica. */
-	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
-			relay_process_wal_event, cbus_process);
-
 	/* Start fiber for receiving replica acks. */
 	char name[FIBER_NAME_MAX];
 	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
@@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap)
 	fiber_set_joinable(reader, true);
 	fiber_start(reader, relay, fiber());
 
-	/*
-	 * If the replica happens to be up to date on subscribe,
-	 * don't wait for timeout to happen - send a heartbeat
-	 * message right away to update the replication lag as
-	 * soon as possible.
-	 */
-	relay_send_heartbeat(relay);
+	/* Start fiber for sending heartbeats to replica. */
+	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater");
+	struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f);
+	fiber_set_joinable(heartbeater, true);
+	fiber_start(heartbeater, relay, fiber());
+
+	/* Setup WAL watcher for sending new rows to the replica. */
+	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
+			relay_process_wal_event, cbus_process);
 
 	/*
 	 * Run the event loop until the connection is broken
 	 * or an error occurs.
 	 */
 	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
-		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
-					    ERRINJ_DOUBLE);
-		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
-
-		fiber_cond_wait_deadline(&relay->reader_cond,
-					 relay->last_row_time + timeout);
+		fiber_cond_wait(&relay->reader_cond);
 
 		/*
 		 * The fiber can be woken by IO cancel, by a timeout of
@@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap)
 		 * Handle cbus messages first.
 		 */
 		cbus_process(&relay->endpoint);
-		/* Check for a heartbeat timeout. */
-		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
-			relay_send_heartbeat(relay);
 		/*
 		 * Check that the vclock has been updated and the previous
 		 * status message is delivered
@@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap)
 	trigger_clear(&on_close_log);
 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
 
-	/* Join ack reader fiber. */
+	/* Join ack reader & heartbeater fibers. */
 	fiber_cancel(reader);
+	fiber_cancel(heartbeater);
 	fiber_join(reader);
+	fiber_join(heartbeater);
 
 	/* Destroy cpipe to tx. */
 	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
@@ -716,6 +744,19 @@ static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
+
+	relay->processed++;
+	if (packet == NULL) {
+		if (relay->processed % 10000 != 0)
+			return;
+
+		ev_now_update(loop());
+		if (ev_monotonic_now(loop()) - relay->last_row_time > replication_timeout)
+			fiber_yield_timeout(0);
+
+		return;
+	}
+
 	assert(iproto_type_is_dml(packet->type));
 	/*
 	 * Transform replica local requests to IPROTO_NOP so as to
-- 
2.20.1

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2019-09-06  8:31 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-09-05 13:32 [tarantool-patches] [PATCH] relay: send heartbeats while skipping WALs already applied by replica Roman Tokarev
2019-09-06  7:20 ` [tarantool-patches] " Georgy Kirichenko
2019-09-06  8:31   ` Georgy Kirichenko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox