From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 3/5] relay: cleanup error handling Date: Sat, 29 Dec 2018 00:21:49 +0300 Message-Id: <506b293105a17da606a860b71cddbcd2bba02230.1546030880.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org List-ID: A few changes intended to make error messages more clear, remove duplicates, etc: - Don't log an error when xstream_write() fails in recover_xlog() - it's a responsibility of the caller. Logging it there results in the same error occuring twice in the log. - If recover_xlog() fails to apply a row and continues due to force_recovery flag, log the row's LSN - it might be useful for problem analysis. - Don't override relay error in relay_process_wal_event(), otherwise we can get 'fiber is cancelled' error in the status, which is meaningless. - Break replication if we fail to send an ack as it's pointless to continue then. - Log a relay error only once - when the relay thread is exiting. Don't log subsequent errors - they don't make much sense. - Set the relay cord name before setting WAL watcher: the WAL watcher sends an event as soon as it's installed, which starts xlog recovery, which is logged by the relay so we want the relay name to be valid. Note, there's a catch here: we used the original cord name as cbus endpoint name so now we have to pass endpoint name explicitly - this looks better anyway. While we are at it, let's also add some comments to relay_subscribe_f() and remove diag_is_empty() check as diag is always set when relay exits. Part of #3910 --- src/box/recovery.cc | 6 +++-- src/box/relay.cc | 75 ++++++++++++++++++++++++++++++++--------------------- 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/src/box/recovery.cc b/src/box/recovery.cc index c3cc7454..e95b03e2 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -277,8 +277,6 @@ recover_xlog(struct recovery *r, struct xstream *stream, say_info("%.1fM rows processed", row_count / 1000000.); } else { - say_error("can't apply row: "); - diag_log(); /* * Stop recovery if a system error occurred, * no matter if force_recovery is set or not, @@ -291,6 +289,10 @@ recover_xlog(struct recovery *r, struct xstream *stream, if (!r->wal_dir.force_recovery || !type_assignable(&type_ClientError, e->type)) diag_raise(); + + say_error("skipping row {%u: %lld}", + (unsigned)row.replica_id, (long long)row.lsn); + diag_log(); } } } diff --git a/src/box/relay.cc b/src/box/relay.cc index 3d9703ea..988c01d3 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -406,6 +406,14 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock) } static void +relay_set_error(struct relay *relay, struct error *e) +{ + /* Don't override existing error. */ + if (diag_is_empty(&relay->diag)) + diag_add_error(&relay->diag, e); +} + +static void relay_process_wal_event(struct wal_watcher *watcher, unsigned events) { struct relay *relay = container_of(watcher, struct relay, wal_watcher); @@ -425,8 +433,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) recover_remaining_wals(relay->r, &relay->stream, NULL, (events & WAL_EVENT_ROTATE) != 0); } catch (Exception *e) { - e->log(); - diag_move(diag_get(), &relay->diag); + relay_set_error(relay, e); fiber_cancel(fiber()); } } @@ -456,17 +463,8 @@ relay_reader_f(va_list ap) fiber_cond_signal(&relay->reader_cond); } } catch (Exception *e) { - if (diag_is_empty(&relay->diag)) { - /* Don't override existing error. */ - diag_move(diag_get(), &relay->diag); - fiber_cancel(relay_f); - } else if (!fiber_is_cancelled()) { - /* - * There is an relay error and this fiber - * fiber has another, log it. - */ - e->log(); - } + relay_set_error(relay, e); + fiber_cancel(relay_f); } ibuf_destroy(&ibuf); return 0; @@ -483,7 +481,8 @@ relay_send_heartbeat(struct relay *relay) try { relay_send(relay, &row); } catch (Exception *e) { - e->log(); + relay_set_error(relay, e); + fiber_cancel(fiber()); } } @@ -499,20 +498,25 @@ relay_subscribe_f(va_list ap) struct recovery *r = relay->r; coio_enable(); - cbus_endpoint_create(&relay->endpoint, cord_name(cord()), + relay_set_cord_name(relay->io.fd); + + /* Create cpipe to tx for propagating vclock. */ + cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay), fiber_schedule_cb, fiber()); - cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe, - NULL, NULL, cbus_process); + cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, + &relay->relay_pipe, NULL, NULL, cbus_process); + /* Setup garbage collection trigger. */ struct trigger on_close_log = { RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL }; trigger_add(&r->on_close_log, &on_close_log); - wal_set_watcher(&relay->wal_watcher, cord_name(cord()), - relay_process_wal_event, cbus_process); - relay_set_cord_name(relay->io.fd); + /* Setup WAL watcher for sending new rows to the replica. */ + wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, + relay_process_wal_event, cbus_process); + /* Start fiber for receiving replica acks. */ char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); struct fiber *reader = fiber_new_xc(name, relay_reader_f); @@ -527,6 +531,10 @@ relay_subscribe_f(va_list ap) */ relay_send_heartbeat(relay); + /* + * Run the event loop until the connection is broken + * or an error occurs. + */ while (!fiber_is_cancelled()) { double timeout = replication_timeout; struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, @@ -571,26 +579,33 @@ relay_subscribe_f(va_list ap) relay_schedule_pending_gc(relay, send_vclock); } + /* + * Log the error that caused the relay to break the loop. + * Don't clear the error for status reporting. + */ + assert(!diag_is_empty(&relay->diag)); + diag_add_error(diag_get(), diag_last_error(&relay->diag)); + diag_log(); say_crit("exiting the relay loop"); + + /* Clear garbage collector trigger and WAL watcher. */ trigger_clear(&on_close_log); wal_clear_watcher(&relay->wal_watcher, cbus_process); - if (!fiber_is_dead(reader)) - fiber_cancel(reader); + + /* Join ack reader fiber. */ + fiber_cancel(reader); fiber_join(reader); + + /* Destroy cpipe to tx. */ cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); cbus_endpoint_destroy(&relay->endpoint, cbus_process); - if (!diag_is_empty(&relay->diag)) { - /* An error has occurred while reading ACKs of xlog. */ - diag_move(&relay->diag, diag_get()); - /* Reference the diag in the status. */ - diag_add_error(&relay->diag, diag_last_error(diag_get())); - } + struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE); if (inj != NULL && inj->dparam > 0) fiber_sleep(inj->dparam); - return diag_is_empty(diag_get()) ? 0: -1; + return -1; } /** Replication acceptor fiber handler. */ @@ -621,7 +636,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; - int rc = cord_costart(&relay->cord, tt_sprintf("relay_%p", relay), + int rc = cord_costart(&relay->cord, "subscribe", relay_subscribe_f, relay); if (rc == 0) rc = cord_cojoin(&relay->cord); -- 2.11.0