From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 4309B4696C4 for ; Wed, 12 Feb 2020 12:39:23 +0300 (MSK) From: Georgy Kirichenko Date: Wed, 12 Feb 2020 12:39:11 +0300 Message-Id: <5cf963da3d429fdf99120295c9131fd439c70229.1581500169.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v4 02/11] recovery: do not throw an error List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Relaying from C-written wal requires recovery to be a C-compliant. So get rid of exception from recovery interface. Part of #980 --- src/box/box.cc | 19 ++++++++-- src/box/recovery.cc | 89 ++++++++++++++++++++++++++------------------- src/box/recovery.h | 14 +++---- src/box/relay.cc | 15 ++++---- 4 files changed, 82 insertions(+), 55 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 68038df18..611100b8b 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2166,6 +2166,8 @@ local_recovery(const struct tt_uuid *instance_uuid, recovery = recovery_new(cfg_gets("wal_dir"), cfg_geti("force_recovery"), checkpoint_vclock); + if (recovery == NULL) + diag_raise(); /* * Make sure we report the actual recovery position @@ -2183,7 +2185,8 @@ local_recovery(const struct tt_uuid *instance_uuid, * so we must reflect this in replicaset vclock to * not attempt to apply these rows twice. */ - recovery_scan(recovery, &replicaset.vclock, &gc.vclock); + if (recovery_scan(recovery, &replicaset.vclock, &gc.vclock) != 0) + diag_raise(); say_info("instance vclock %s", vclock_to_string(&replicaset.vclock)); if (wal_dir_lock >= 0) { @@ -2226,7 +2229,8 @@ local_recovery(const struct tt_uuid *instance_uuid, memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock); engine_begin_final_recovery_xc(); - recover_remaining_wals(recovery, &wal_stream.base, NULL, false); + if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0) + diag_raise(); engine_end_recovery_xc(); /* * Leave hot standby mode, if any, only after @@ -2239,6 +2243,10 @@ local_recovery(const struct tt_uuid *instance_uuid, cfg_getd("wal_dir_rescan_delay")); while (true) { if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock)) { + /* + * Let recovery_stop_local override + * a path_lock error. + */ recovery_stop_local(recovery); diag_raise(); } @@ -2246,8 +2254,11 @@ local_recovery(const struct tt_uuid *instance_uuid, break; fiber_sleep(0.1); } - recovery_stop_local(recovery); - recover_remaining_wals(recovery, &wal_stream.base, NULL, true); + if (recovery_stop_local(recovery) != 0) + diag_raise(); + if (recover_remaining_wals(recovery, &wal_stream.base, NULL, + true) != 0) + diag_raise(); /* * Advance replica set vclock to reflect records * applied in hot standby mode. diff --git a/src/box/recovery.cc b/src/box/recovery.cc index a1ac2d967..e4aad1296 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -87,14 +87,11 @@ recovery_new(const char *wal_dirname, bool force_recovery, calloc(1, sizeof(*r)); if (r == NULL) { - tnt_raise(OutOfMemory, sizeof(*r), "malloc", - "struct recovery"); + diag_set(OutOfMemory, sizeof(*r), "malloc", + "struct recovery"); + return NULL; } - auto guard = make_scoped_guard([=]{ - free(r); - }); - xdir_create(&r->wal_dir, wal_dirname, XLOG, &INSTANCE_UUID, &xlog_opts_default); r->wal_dir.force_recovery = force_recovery; @@ -108,27 +105,31 @@ recovery_new(const char *wal_dirname, bool force_recovery, * UUID, see replication/cluster.test for * details. */ - xdir_check_xc(&r->wal_dir); + if (xdir_check(&r->wal_dir) != 0) { + xdir_destroy(&r->wal_dir); + free(r); + return NULL; + } r->watcher = NULL; rlist_create(&r->on_close_log); - guard.is_active = false; return r; } -void +int recovery_scan(struct recovery *r, struct vclock *end_vclock, struct vclock *gc_vclock) { - xdir_scan_xc(&r->wal_dir); + if (xdir_scan(&r->wal_dir) != 0) + return -1; if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 || vclock_compare(end_vclock, &r->vclock) < 0) { /* No xlogs after last checkpoint. */ vclock_copy(gc_vclock, &r->vclock); vclock_copy(end_vclock, &r->vclock); - return; + return 0; } if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0) @@ -137,11 +138,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock, /* Scan the last xlog to find end vclock. */ struct xlog_cursor cursor; if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0) - return; + return 0; struct xrow_header row; while (xlog_cursor_next(&cursor, &row, true) == 0) vclock_follow_xrow(end_vclock, &row); xlog_cursor_close(&cursor, false); + return 0; } static inline void @@ -156,19 +158,21 @@ recovery_close_log(struct recovery *r) r->cursor.name); } xlog_cursor_close(&r->cursor, false); - trigger_run_xc(&r->on_close_log, NULL); + /* Suppress a trigger error if happened. */ + trigger_run(&r->on_close_log, NULL); } -static void +static int recovery_open_log(struct recovery *r, const struct vclock *vclock) { - XlogGapError *e; struct xlog_meta meta = r->cursor.meta; enum xlog_cursor_state state = r->cursor.state; recovery_close_log(r); - xdir_open_cursor_xc(&r->wal_dir, vclock_sum(vclock), &r->cursor); + if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock), + &r->cursor) != 0) + return -1; if (state == XLOG_CURSOR_NEW && vclock_compare(vclock, &r->vclock) > 0) { @@ -201,14 +205,14 @@ out: */ if (vclock_compare(&r->vclock, vclock) < 0) vclock_copy(&r->vclock, vclock); - return; + return 0; gap_error: - e = tnt_error(XlogGapError, &r->vclock, vclock); + diag_set(XlogGapError, &r->vclock, vclock); if (!r->wal_dir.force_recovery) - throw e; + return -1; /* Ignore missing WALs if force_recovery is set. */ - e->log(); + diag_log(); say_warn("ignoring a gap in LSN"); goto out; } @@ -217,7 +221,6 @@ void recovery_delete(struct recovery *r) { assert(r->watcher == NULL); - trigger_destroy(&r->on_close_log); xdir_destroy(&r->wal_dir); if (xlog_cursor_is_open(&r->cursor)) { @@ -237,25 +240,26 @@ recovery_delete(struct recovery *r) * The reading will be stopped on reaching stop_vclock. * Use NULL for boundless recover */ -static void +static int recover_xlog(struct recovery *r, struct xstream *stream, const struct vclock *stop_vclock) { struct xrow_header row; uint64_t row_count = 0; - while (xlog_cursor_next_xc(&r->cursor, &row, - r->wal_dir.force_recovery) == 0) { + int rc; + while ((rc = xlog_cursor_next(&r->cursor, &row, + r->wal_dir.force_recovery)) == 0) { /* * Read the next row from xlog file. * - * xlog_cursor_next_xc() returns 1 when + * xlog_cursor_next() returns 1 when * it can not read more rows. This doesn't mean * the file is fully read: it's fully read only * when EOF marker has been read, see i.eof_read */ if (stop_vclock != NULL && r->vclock.signature >= stop_vclock->signature) - return; + return 0; int64_t current_lsn = vclock_get(&r->vclock, row.replica_id); if (row.lsn <= current_lsn) continue; /* already applied, skip */ @@ -282,13 +286,16 @@ recover_xlog(struct recovery *r, struct xstream *stream, row_count / 1000000.); } else { if (!r->wal_dir.force_recovery) - diag_raise(); + return -1; say_error("skipping row {%u: %lld}", (unsigned)row.replica_id, (long long)row.lsn); diag_log(); } } + if (rc < 0) + return -1; + return 0; } /** @@ -302,14 +309,14 @@ recover_xlog(struct recovery *r, struct xstream *stream, * This function will not close r->current_wal if * recovery was successful. */ -void +int recover_remaining_wals(struct recovery *r, struct xstream *stream, const struct vclock *stop_vclock, bool scan_dir) { struct vclock *clock; - if (scan_dir) - xdir_scan_xc(&r->wal_dir); + if (scan_dir && xdir_scan(&r->wal_dir) != 0) + return -1; if (xlog_cursor_is_open(&r->cursor)) { /* If there's a WAL open, recover from it first. */ @@ -343,21 +350,26 @@ recover_remaining_wals(struct recovery *r, struct xstream *stream, continue; } - recovery_open_log(r, clock); + if (recovery_open_log(r, clock) != 0) + return -1; say_info("recover from `%s'", r->cursor.name); recover_current_wal: - recover_xlog(r, stream, stop_vclock); + if (recover_xlog(r, stream, stop_vclock) != 0) + return -1; } if (xlog_cursor_is_eof(&r->cursor)) recovery_close_log(r); - if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) - tnt_raise(XlogGapError, &r->vclock, stop_vclock); + if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) { + diag_set(XlogGapError, &r->vclock, stop_vclock); + return -1; + } region_free(&fiber()->gc); + return 0; } void @@ -481,7 +493,9 @@ hot_standby_f(va_list ap) do { start = vclock_sum(&r->vclock); - recover_remaining_wals(r, stream, NULL, scan_dir); + if (recover_remaining_wals(r, stream, NULL, + scan_dir) != 0) + diag_raise(); end = vclock_sum(&r->vclock); /* @@ -529,7 +543,7 @@ recovery_follow_local(struct recovery *r, struct xstream *stream, fiber_start(r->watcher, r, stream, wal_dir_rescan_delay); } -void +int recovery_stop_local(struct recovery *r) { if (r->watcher) { @@ -537,8 +551,9 @@ recovery_stop_local(struct recovery *r) r->watcher = NULL; fiber_cancel(f); if (fiber_join(f) != 0) - diag_raise(); + return -1; } + return 0; } /* }}} */ diff --git a/src/box/recovery.h b/src/box/recovery.h index 6e68abc0b..145d9199e 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -74,7 +74,7 @@ recovery_delete(struct recovery *r); * @gc_vclock is set to the oldest vclock available in the * WAL directory. */ -void +int recovery_scan(struct recovery *r, struct vclock *end_vclock, struct vclock *gc_vclock); @@ -82,16 +82,12 @@ void recovery_follow_local(struct recovery *r, struct xstream *stream, const char *name, ev_tstamp wal_dir_rescan_delay); -void +int recovery_stop_local(struct recovery *r); void recovery_finalize(struct recovery *r); -#if defined(__cplusplus) -} /* extern "C" */ -#endif /* defined(__cplusplus) */ - /** * Find out if there are new .xlog files since the current * vclock, and read them all up. @@ -102,8 +98,12 @@ recovery_finalize(struct recovery *r); * This function will not close r->current_wal if * recovery was successful. */ -void +int recover_remaining_wals(struct recovery *r, struct xstream *stream, const struct vclock *stop_vclock, bool scan_dir); +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + #endif /* TARANTOOL_RECOVERY_H_INCLUDED */ diff --git a/src/box/relay.cc b/src/box/relay.cc index b89632273..d5a1c9c68 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -334,8 +334,9 @@ relay_final_join_f(va_list ap) /* Send all WALs until stop_vclock */ assert(relay->stream.write != NULL); - recover_remaining_wals(relay->r, &relay->stream, - &relay->stop_vclock, true); + if (recover_remaining_wals(relay->r, &relay->stream, + &relay->stop_vclock, true) != 0) + diag_raise(); assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0); return 0; } @@ -491,11 +492,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) */ return; } - try { - recover_remaining_wals(relay->r, &relay->stream, NULL, - (events & WAL_EVENT_ROTATE) != 0); - } catch (Exception *e) { - relay_set_error(relay, e); + if (recover_remaining_wals(relay->r, &relay->stream, NULL, + (events & WAL_EVENT_ROTATE) != 0) != 0) { + relay_set_error(relay, diag_last_error(diag_get())); fiber_cancel(fiber()); } } @@ -702,6 +701,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); relay->r = recovery_new(cfg_gets("wal_dir"), false, replica_clock); + if (relay->r == NULL) + diag_raise(); vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; -- 2.25.0