Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov <vdavydov.dev@gmail.com>
To: tarantool-patches@freelists.org
Subject: [PATCH 3/5] relay: cleanup error handling
Date: Sat, 29 Dec 2018 00:21:49 +0300	[thread overview]
Message-ID: <506b293105a17da606a860b71cddbcd2bba02230.1546030880.git.vdavydov.dev@gmail.com> (raw)
In-Reply-To: <cover.1546030880.git.vdavydov.dev@gmail.com>
In-Reply-To: <cover.1546030880.git.vdavydov.dev@gmail.com>

A few changes intended to make error messages more clear, remove
duplicates, etc:

 - Don't log an error when xstream_write() fails in recover_xlog() -
   it's a responsibility of the caller. Logging it there results in
   the same error occuring twice in the log.
 - If recover_xlog() fails to apply a row and continues due to
   force_recovery flag, log the row's LSN - it might be useful for
   problem analysis.
 - Don't override relay error in relay_process_wal_event(), otherwise
   we can get 'fiber is cancelled' error in the status, which is
   meaningless.
 - Break replication if we fail to send an ack as it's pointless to
   continue then.
 - Log a relay error only once - when the relay thread is exiting.
   Don't log subsequent errors - they don't make much sense.
 - Set the relay cord name before setting WAL watcher: the WAL watcher
   sends an event as soon as it's installed, which starts xlog recovery,
   which is logged by the relay so we want the relay name to be valid.
   Note, there's a catch here: we used the original cord name as cbus
   endpoint name so now we have to pass endpoint name explicitly - this
   looks better anyway.

While we are at it, let's also add some comments to relay_subscribe_f()
and remove diag_is_empty() check as diag is always set when relay exits.

Part of #3910
---
 src/box/recovery.cc |  6 +++--
 src/box/relay.cc    | 75 ++++++++++++++++++++++++++++++++---------------------
 2 files changed, 49 insertions(+), 32 deletions(-)

diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index c3cc7454..e95b03e2 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -277,8 +277,6 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 				say_info("%.1fM rows processed",
 					 row_count / 1000000.);
 		} else {
-			say_error("can't apply row: ");
-			diag_log();
 			/*
 			 * Stop recovery if a system error occurred,
 			 * no matter if force_recovery is set or not,
@@ -291,6 +289,10 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 			if (!r->wal_dir.force_recovery ||
 			    !type_assignable(&type_ClientError, e->type))
 				diag_raise();
+
+			say_error("skipping row {%u: %lld}",
+				  (unsigned)row.replica_id, (long long)row.lsn);
+			diag_log();
 		}
 	}
 }
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 3d9703ea..988c01d3 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -406,6 +406,14 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
 }
 
 static void
+relay_set_error(struct relay *relay, struct error *e)
+{
+	/* Don't override existing error. */
+	if (diag_is_empty(&relay->diag))
+		diag_add_error(&relay->diag, e);
+}
+
+static void
 relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 {
 	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
@@ -425,8 +433,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 		recover_remaining_wals(relay->r, &relay->stream, NULL,
 				       (events & WAL_EVENT_ROTATE) != 0);
 	} catch (Exception *e) {
-		e->log();
-		diag_move(diag_get(), &relay->diag);
+		relay_set_error(relay, e);
 		fiber_cancel(fiber());
 	}
 }
@@ -456,17 +463,8 @@ relay_reader_f(va_list ap)
 			fiber_cond_signal(&relay->reader_cond);
 		}
 	} catch (Exception *e) {
-		if (diag_is_empty(&relay->diag)) {
-			/* Don't override existing error. */
-			diag_move(diag_get(), &relay->diag);
-			fiber_cancel(relay_f);
-		} else if (!fiber_is_cancelled()) {
-			/*
-			 * There is an relay error and this fiber
-			 * fiber has another, log it.
-			 */
-			e->log();
-		}
+		relay_set_error(relay, e);
+		fiber_cancel(relay_f);
 	}
 	ibuf_destroy(&ibuf);
 	return 0;
@@ -483,7 +481,8 @@ relay_send_heartbeat(struct relay *relay)
 	try {
 		relay_send(relay, &row);
 	} catch (Exception *e) {
-		e->log();
+		relay_set_error(relay, e);
+		fiber_cancel(fiber());
 	}
 }
 
@@ -499,20 +498,25 @@ relay_subscribe_f(va_list ap)
 	struct recovery *r = relay->r;
 
 	coio_enable();
-	cbus_endpoint_create(&relay->endpoint, cord_name(cord()),
+	relay_set_cord_name(relay->io.fd);
+
+	/* Create cpipe to tx for propagating vclock. */
+	cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay),
 			     fiber_schedule_cb, fiber());
-	cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe,
-		  NULL, NULL, cbus_process);
+	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
+		  &relay->relay_pipe, NULL, NULL, cbus_process);
+
 	/* Setup garbage collection trigger. */
 	struct trigger on_close_log = {
 		RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
 	};
 	trigger_add(&r->on_close_log, &on_close_log);
-	wal_set_watcher(&relay->wal_watcher, cord_name(cord()),
-			relay_process_wal_event, cbus_process);
 
-	relay_set_cord_name(relay->io.fd);
+	/* Setup WAL watcher for sending new rows to the replica. */
+	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
+			relay_process_wal_event, cbus_process);
 
+	/* Start fiber for receiving replica acks. */
 	char name[FIBER_NAME_MAX];
 	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
 	struct fiber *reader = fiber_new_xc(name, relay_reader_f);
@@ -527,6 +531,10 @@ relay_subscribe_f(va_list ap)
 	 */
 	relay_send_heartbeat(relay);
 
+	/*
+	 * Run the event loop until the connection is broken
+	 * or an error occurs.
+	 */
 	while (!fiber_is_cancelled()) {
 		double timeout = replication_timeout;
 		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
@@ -571,26 +579,33 @@ relay_subscribe_f(va_list ap)
 		relay_schedule_pending_gc(relay, send_vclock);
 	}
 
+	/*
+	 * Log the error that caused the relay to break the loop.
+	 * Don't clear the error for status reporting.
+	 */
+	assert(!diag_is_empty(&relay->diag));
+	diag_add_error(diag_get(), diag_last_error(&relay->diag));
+	diag_log();
 	say_crit("exiting the relay loop");
+
+	/* Clear garbage collector trigger and WAL watcher. */
 	trigger_clear(&on_close_log);
 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
-	if (!fiber_is_dead(reader))
-		fiber_cancel(reader);
+
+	/* Join ack reader fiber. */
+	fiber_cancel(reader);
 	fiber_join(reader);
+
+	/* Destroy cpipe to tx. */
 	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
 		    NULL, NULL, cbus_process);
 	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
-	if (!diag_is_empty(&relay->diag)) {
-		/* An error has occurred while reading ACKs of xlog. */
-		diag_move(&relay->diag, diag_get());
-		/* Reference the diag in the status. */
-		diag_add_error(&relay->diag, diag_last_error(diag_get()));
-	}
+
 	struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE);
 	if (inj != NULL && inj->dparam > 0)
 		fiber_sleep(inj->dparam);
 
-	return diag_is_empty(diag_get()) ? 0: -1;
+	return -1;
 }
 
 /** Replication acceptor fiber handler. */
@@ -621,7 +636,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
-	int rc = cord_costart(&relay->cord, tt_sprintf("relay_%p", relay),
+	int rc = cord_costart(&relay->cord, "subscribe",
 			      relay_subscribe_f, relay);
 	if (rc == 0)
 		rc = cord_cojoin(&relay->cord);
-- 
2.11.0

  parent reply	other threads:[~2018-12-28 21:21 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-12-28 21:21 [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov
2018-12-28 21:21 ` [PATCH 1/5] recovery: stop writing to xstream on system error Vladimir Davydov
2018-12-29  9:09   ` [tarantool-patches] " Konstantin Osipov
2018-12-29  9:50     ` Vladimir Davydov
2018-12-29 10:57       ` Vladimir Davydov
2018-12-29 12:08         ` Konstantin Osipov
2018-12-28 21:21 ` [PATCH 2/5] relay: do not try to scan xlog if exiting Vladimir Davydov
2018-12-29  9:14   ` [tarantool-patches] " Konstantin Osipov
2018-12-29  9:53     ` Vladimir Davydov
2018-12-28 21:21 ` Vladimir Davydov [this message]
2018-12-28 21:21 ` [PATCH 4/5] relay: close xlog cursor in relay thread Vladimir Davydov
2018-12-28 21:21 ` [PATCH 5/5] xlog: assure xlog is opened and closed in the same thread Vladimir Davydov
2018-12-29 11:40 ` [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=506b293105a17da606a860b71cddbcd2bba02230.1546030880.git.vdavydov.dev@gmail.com \
    --to=vdavydov.dev@gmail.com \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [PATCH 3/5] relay: cleanup error handling' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox