[PATCH 3/5] relay: cleanup error handling

Vladimir Davydov vdavydov.dev at gmail.com
Sat Dec 29 00:21:49 MSK 2018


A few changes intended to make error messages more clear, remove
duplicates, etc:

 - Don't log an error when xstream_write() fails in recover_xlog() -
   it's a responsibility of the caller. Logging it there results in
   the same error occuring twice in the log.
 - If recover_xlog() fails to apply a row and continues due to
   force_recovery flag, log the row's LSN - it might be useful for
   problem analysis.
 - Don't override relay error in relay_process_wal_event(), otherwise
   we can get 'fiber is cancelled' error in the status, which is
   meaningless.
 - Break replication if we fail to send an ack as it's pointless to
   continue then.
 - Log a relay error only once - when the relay thread is exiting.
   Don't log subsequent errors - they don't make much sense.
 - Set the relay cord name before setting WAL watcher: the WAL watcher
   sends an event as soon as it's installed, which starts xlog recovery,
   which is logged by the relay so we want the relay name to be valid.
   Note, there's a catch here: we used the original cord name as cbus
   endpoint name so now we have to pass endpoint name explicitly - this
   looks better anyway.

While we are at it, let's also add some comments to relay_subscribe_f()
and remove diag_is_empty() check as diag is always set when relay exits.

Part of #3910
---
 src/box/recovery.cc |  6 +++--
 src/box/relay.cc    | 75 ++++++++++++++++++++++++++++++++---------------------
 2 files changed, 49 insertions(+), 32 deletions(-)

diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index c3cc7454..e95b03e2 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -277,8 +277,6 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 				say_info("%.1fM rows processed",
 					 row_count / 1000000.);
 		} else {
-			say_error("can't apply row: ");
-			diag_log();
 			/*
 			 * Stop recovery if a system error occurred,
 			 * no matter if force_recovery is set or not,
@@ -291,6 +289,10 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 			if (!r->wal_dir.force_recovery ||
 			    !type_assignable(&type_ClientError, e->type))
 				diag_raise();
+
+			say_error("skipping row {%u: %lld}",
+				  (unsigned)row.replica_id, (long long)row.lsn);
+			diag_log();
 		}
 	}
 }
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 3d9703ea..988c01d3 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -406,6 +406,14 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
 }
 
 static void
+relay_set_error(struct relay *relay, struct error *e)
+{
+	/* Don't override existing error. */
+	if (diag_is_empty(&relay->diag))
+		diag_add_error(&relay->diag, e);
+}
+
+static void
 relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 {
 	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
@@ -425,8 +433,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 		recover_remaining_wals(relay->r, &relay->stream, NULL,
 				       (events & WAL_EVENT_ROTATE) != 0);
 	} catch (Exception *e) {
-		e->log();
-		diag_move(diag_get(), &relay->diag);
+		relay_set_error(relay, e);
 		fiber_cancel(fiber());
 	}
 }
@@ -456,17 +463,8 @@ relay_reader_f(va_list ap)
 			fiber_cond_signal(&relay->reader_cond);
 		}
 	} catch (Exception *e) {
-		if (diag_is_empty(&relay->diag)) {
-			/* Don't override existing error. */
-			diag_move(diag_get(), &relay->diag);
-			fiber_cancel(relay_f);
-		} else if (!fiber_is_cancelled()) {
-			/*
-			 * There is an relay error and this fiber
-			 * fiber has another, log it.
-			 */
-			e->log();
-		}
+		relay_set_error(relay, e);
+		fiber_cancel(relay_f);
 	}
 	ibuf_destroy(&ibuf);
 	return 0;
@@ -483,7 +481,8 @@ relay_send_heartbeat(struct relay *relay)
 	try {
 		relay_send(relay, &row);
 	} catch (Exception *e) {
-		e->log();
+		relay_set_error(relay, e);
+		fiber_cancel(fiber());
 	}
 }
 
@@ -499,20 +498,25 @@ relay_subscribe_f(va_list ap)
 	struct recovery *r = relay->r;
 
 	coio_enable();
-	cbus_endpoint_create(&relay->endpoint, cord_name(cord()),
+	relay_set_cord_name(relay->io.fd);
+
+	/* Create cpipe to tx for propagating vclock. */
+	cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay),
 			     fiber_schedule_cb, fiber());
-	cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe,
-		  NULL, NULL, cbus_process);
+	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
+		  &relay->relay_pipe, NULL, NULL, cbus_process);
+
 	/* Setup garbage collection trigger. */
 	struct trigger on_close_log = {
 		RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
 	};
 	trigger_add(&r->on_close_log, &on_close_log);
-	wal_set_watcher(&relay->wal_watcher, cord_name(cord()),
-			relay_process_wal_event, cbus_process);
 
-	relay_set_cord_name(relay->io.fd);
+	/* Setup WAL watcher for sending new rows to the replica. */
+	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
+			relay_process_wal_event, cbus_process);
 
+	/* Start fiber for receiving replica acks. */
 	char name[FIBER_NAME_MAX];
 	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
 	struct fiber *reader = fiber_new_xc(name, relay_reader_f);
@@ -527,6 +531,10 @@ relay_subscribe_f(va_list ap)
 	 */
 	relay_send_heartbeat(relay);
 
+	/*
+	 * Run the event loop until the connection is broken
+	 * or an error occurs.
+	 */
 	while (!fiber_is_cancelled()) {
 		double timeout = replication_timeout;
 		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
@@ -571,26 +579,33 @@ relay_subscribe_f(va_list ap)
 		relay_schedule_pending_gc(relay, send_vclock);
 	}
 
+	/*
+	 * Log the error that caused the relay to break the loop.
+	 * Don't clear the error for status reporting.
+	 */
+	assert(!diag_is_empty(&relay->diag));
+	diag_add_error(diag_get(), diag_last_error(&relay->diag));
+	diag_log();
 	say_crit("exiting the relay loop");
+
+	/* Clear garbage collector trigger and WAL watcher. */
 	trigger_clear(&on_close_log);
 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
-	if (!fiber_is_dead(reader))
-		fiber_cancel(reader);
+
+	/* Join ack reader fiber. */
+	fiber_cancel(reader);
 	fiber_join(reader);
+
+	/* Destroy cpipe to tx. */
 	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
 		    NULL, NULL, cbus_process);
 	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
-	if (!diag_is_empty(&relay->diag)) {
-		/* An error has occurred while reading ACKs of xlog. */
-		diag_move(&relay->diag, diag_get());
-		/* Reference the diag in the status. */
-		diag_add_error(&relay->diag, diag_last_error(diag_get()));
-	}
+
 	struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE);
 	if (inj != NULL && inj->dparam > 0)
 		fiber_sleep(inj->dparam);
 
-	return diag_is_empty(diag_get()) ? 0: -1;
+	return -1;
 }
 
 /** Replication acceptor fiber handler. */
@@ -621,7 +636,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
-	int rc = cord_costart(&relay->cord, tt_sprintf("relay_%p", relay),
+	int rc = cord_costart(&relay->cord, "subscribe",
 			      relay_subscribe_f, relay);
 	if (rc == 0)
 		rc = cord_cojoin(&relay->cord);
-- 
2.11.0




More information about the Tarantool-patches mailing list