[Tarantool-patches] [PATCH v4 02/11] recovery: do not throw an error
Georgy Kirichenko
georgy at tarantool.org
Wed Feb 12 12:39:11 MSK 2020
Relaying from C-written wal requires recovery to be a C-compliant. So
get rid of exception from recovery interface.
Part of #980
---
src/box/box.cc | 19 ++++++++--
src/box/recovery.cc | 89 ++++++++++++++++++++++++++-------------------
src/box/recovery.h | 14 +++----
src/box/relay.cc | 15 ++++----
4 files changed, 82 insertions(+), 55 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 68038df18..611100b8b 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2166,6 +2166,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
recovery = recovery_new(cfg_gets("wal_dir"),
cfg_geti("force_recovery"),
checkpoint_vclock);
+ if (recovery == NULL)
+ diag_raise();
/*
* Make sure we report the actual recovery position
@@ -2183,7 +2185,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
* so we must reflect this in replicaset vclock to
* not attempt to apply these rows twice.
*/
- recovery_scan(recovery, &replicaset.vclock, &gc.vclock);
+ if (recovery_scan(recovery, &replicaset.vclock, &gc.vclock) != 0)
+ diag_raise();
say_info("instance vclock %s", vclock_to_string(&replicaset.vclock));
if (wal_dir_lock >= 0) {
@@ -2226,7 +2229,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
engine_begin_final_recovery_xc();
- recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
+ if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
+ diag_raise();
engine_end_recovery_xc();
/*
* Leave hot standby mode, if any, only after
@@ -2239,6 +2243,10 @@ local_recovery(const struct tt_uuid *instance_uuid,
cfg_getd("wal_dir_rescan_delay"));
while (true) {
if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock)) {
+ /*
+ * Let recovery_stop_local override
+ * a path_lock error.
+ */
recovery_stop_local(recovery);
diag_raise();
}
@@ -2246,8 +2254,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
break;
fiber_sleep(0.1);
}
- recovery_stop_local(recovery);
- recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
+ if (recovery_stop_local(recovery) != 0)
+ diag_raise();
+ if (recover_remaining_wals(recovery, &wal_stream.base, NULL,
+ true) != 0)
+ diag_raise();
/*
* Advance replica set vclock to reflect records
* applied in hot standby mode.
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index a1ac2d967..e4aad1296 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -87,14 +87,11 @@ recovery_new(const char *wal_dirname, bool force_recovery,
calloc(1, sizeof(*r));
if (r == NULL) {
- tnt_raise(OutOfMemory, sizeof(*r), "malloc",
- "struct recovery");
+ diag_set(OutOfMemory, sizeof(*r), "malloc",
+ "struct recovery");
+ return NULL;
}
- auto guard = make_scoped_guard([=]{
- free(r);
- });
-
xdir_create(&r->wal_dir, wal_dirname, XLOG, &INSTANCE_UUID,
&xlog_opts_default);
r->wal_dir.force_recovery = force_recovery;
@@ -108,27 +105,31 @@ recovery_new(const char *wal_dirname, bool force_recovery,
* UUID, see replication/cluster.test for
* details.
*/
- xdir_check_xc(&r->wal_dir);
+ if (xdir_check(&r->wal_dir) != 0) {
+ xdir_destroy(&r->wal_dir);
+ free(r);
+ return NULL;
+ }
r->watcher = NULL;
rlist_create(&r->on_close_log);
- guard.is_active = false;
return r;
}
-void
+int
recovery_scan(struct recovery *r, struct vclock *end_vclock,
struct vclock *gc_vclock)
{
- xdir_scan_xc(&r->wal_dir);
+ if (xdir_scan(&r->wal_dir) != 0)
+ return -1;
if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 ||
vclock_compare(end_vclock, &r->vclock) < 0) {
/* No xlogs after last checkpoint. */
vclock_copy(gc_vclock, &r->vclock);
vclock_copy(end_vclock, &r->vclock);
- return;
+ return 0;
}
if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0)
@@ -137,11 +138,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock,
/* Scan the last xlog to find end vclock. */
struct xlog_cursor cursor;
if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
- return;
+ return 0;
struct xrow_header row;
while (xlog_cursor_next(&cursor, &row, true) == 0)
vclock_follow_xrow(end_vclock, &row);
xlog_cursor_close(&cursor, false);
+ return 0;
}
static inline void
@@ -156,19 +158,21 @@ recovery_close_log(struct recovery *r)
r->cursor.name);
}
xlog_cursor_close(&r->cursor, false);
- trigger_run_xc(&r->on_close_log, NULL);
+ /* Suppress a trigger error if happened. */
+ trigger_run(&r->on_close_log, NULL);
}
-static void
+static int
recovery_open_log(struct recovery *r, const struct vclock *vclock)
{
- XlogGapError *e;
struct xlog_meta meta = r->cursor.meta;
enum xlog_cursor_state state = r->cursor.state;
recovery_close_log(r);
- xdir_open_cursor_xc(&r->wal_dir, vclock_sum(vclock), &r->cursor);
+ if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock),
+ &r->cursor) != 0)
+ return -1;
if (state == XLOG_CURSOR_NEW &&
vclock_compare(vclock, &r->vclock) > 0) {
@@ -201,14 +205,14 @@ out:
*/
if (vclock_compare(&r->vclock, vclock) < 0)
vclock_copy(&r->vclock, vclock);
- return;
+ return 0;
gap_error:
- e = tnt_error(XlogGapError, &r->vclock, vclock);
+ diag_set(XlogGapError, &r->vclock, vclock);
if (!r->wal_dir.force_recovery)
- throw e;
+ return -1;
/* Ignore missing WALs if force_recovery is set. */
- e->log();
+ diag_log();
say_warn("ignoring a gap in LSN");
goto out;
}
@@ -217,7 +221,6 @@ void
recovery_delete(struct recovery *r)
{
assert(r->watcher == NULL);
-
trigger_destroy(&r->on_close_log);
xdir_destroy(&r->wal_dir);
if (xlog_cursor_is_open(&r->cursor)) {
@@ -237,25 +240,26 @@ recovery_delete(struct recovery *r)
* The reading will be stopped on reaching stop_vclock.
* Use NULL for boundless recover
*/
-static void
+static int
recover_xlog(struct recovery *r, struct xstream *stream,
const struct vclock *stop_vclock)
{
struct xrow_header row;
uint64_t row_count = 0;
- while (xlog_cursor_next_xc(&r->cursor, &row,
- r->wal_dir.force_recovery) == 0) {
+ int rc;
+ while ((rc = xlog_cursor_next(&r->cursor, &row,
+ r->wal_dir.force_recovery)) == 0) {
/*
* Read the next row from xlog file.
*
- * xlog_cursor_next_xc() returns 1 when
+ * xlog_cursor_next() returns 1 when
* it can not read more rows. This doesn't mean
* the file is fully read: it's fully read only
* when EOF marker has been read, see i.eof_read
*/
if (stop_vclock != NULL &&
r->vclock.signature >= stop_vclock->signature)
- return;
+ return 0;
int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
if (row.lsn <= current_lsn)
continue; /* already applied, skip */
@@ -282,13 +286,16 @@ recover_xlog(struct recovery *r, struct xstream *stream,
row_count / 1000000.);
} else {
if (!r->wal_dir.force_recovery)
- diag_raise();
+ return -1;
say_error("skipping row {%u: %lld}",
(unsigned)row.replica_id, (long long)row.lsn);
diag_log();
}
}
+ if (rc < 0)
+ return -1;
+ return 0;
}
/**
@@ -302,14 +309,14 @@ recover_xlog(struct recovery *r, struct xstream *stream,
* This function will not close r->current_wal if
* recovery was successful.
*/
-void
+int
recover_remaining_wals(struct recovery *r, struct xstream *stream,
const struct vclock *stop_vclock, bool scan_dir)
{
struct vclock *clock;
- if (scan_dir)
- xdir_scan_xc(&r->wal_dir);
+ if (scan_dir && xdir_scan(&r->wal_dir) != 0)
+ return -1;
if (xlog_cursor_is_open(&r->cursor)) {
/* If there's a WAL open, recover from it first. */
@@ -343,21 +350,26 @@ recover_remaining_wals(struct recovery *r, struct xstream *stream,
continue;
}
- recovery_open_log(r, clock);
+ if (recovery_open_log(r, clock) != 0)
+ return -1;
say_info("recover from `%s'", r->cursor.name);
recover_current_wal:
- recover_xlog(r, stream, stop_vclock);
+ if (recover_xlog(r, stream, stop_vclock) != 0)
+ return -1;
}
if (xlog_cursor_is_eof(&r->cursor))
recovery_close_log(r);
- if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0)
- tnt_raise(XlogGapError, &r->vclock, stop_vclock);
+ if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) {
+ diag_set(XlogGapError, &r->vclock, stop_vclock);
+ return -1;
+ }
region_free(&fiber()->gc);
+ return 0;
}
void
@@ -481,7 +493,9 @@ hot_standby_f(va_list ap)
do {
start = vclock_sum(&r->vclock);
- recover_remaining_wals(r, stream, NULL, scan_dir);
+ if (recover_remaining_wals(r, stream, NULL,
+ scan_dir) != 0)
+ diag_raise();
end = vclock_sum(&r->vclock);
/*
@@ -529,7 +543,7 @@ recovery_follow_local(struct recovery *r, struct xstream *stream,
fiber_start(r->watcher, r, stream, wal_dir_rescan_delay);
}
-void
+int
recovery_stop_local(struct recovery *r)
{
if (r->watcher) {
@@ -537,8 +551,9 @@ recovery_stop_local(struct recovery *r)
r->watcher = NULL;
fiber_cancel(f);
if (fiber_join(f) != 0)
- diag_raise();
+ return -1;
}
+ return 0;
}
/* }}} */
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 6e68abc0b..145d9199e 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -74,7 +74,7 @@ recovery_delete(struct recovery *r);
* @gc_vclock is set to the oldest vclock available in the
* WAL directory.
*/
-void
+int
recovery_scan(struct recovery *r, struct vclock *end_vclock,
struct vclock *gc_vclock);
@@ -82,16 +82,12 @@ void
recovery_follow_local(struct recovery *r, struct xstream *stream,
const char *name, ev_tstamp wal_dir_rescan_delay);
-void
+int
recovery_stop_local(struct recovery *r);
void
recovery_finalize(struct recovery *r);
-#if defined(__cplusplus)
-} /* extern "C" */
-#endif /* defined(__cplusplus) */
-
/**
* Find out if there are new .xlog files since the current
* vclock, and read them all up.
@@ -102,8 +98,12 @@ recovery_finalize(struct recovery *r);
* This function will not close r->current_wal if
* recovery was successful.
*/
-void
+int
recover_remaining_wals(struct recovery *r, struct xstream *stream,
const struct vclock *stop_vclock, bool scan_dir);
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
#endif /* TARANTOOL_RECOVERY_H_INCLUDED */
diff --git a/src/box/relay.cc b/src/box/relay.cc
index b89632273..d5a1c9c68 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -334,8 +334,9 @@ relay_final_join_f(va_list ap)
/* Send all WALs until stop_vclock */
assert(relay->stream.write != NULL);
- recover_remaining_wals(relay->r, &relay->stream,
- &relay->stop_vclock, true);
+ if (recover_remaining_wals(relay->r, &relay->stream,
+ &relay->stop_vclock, true) != 0)
+ diag_raise();
assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
return 0;
}
@@ -491,11 +492,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
*/
return;
}
- try {
- recover_remaining_wals(relay->r, &relay->stream, NULL,
- (events & WAL_EVENT_ROTATE) != 0);
- } catch (Exception *e) {
- relay_set_error(relay, e);
+ if (recover_remaining_wals(relay->r, &relay->stream, NULL,
+ (events & WAL_EVENT_ROTATE) != 0) != 0) {
+ relay_set_error(relay, diag_last_error(diag_get()));
fiber_cancel(fiber());
}
}
@@ -702,6 +701,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
relay->r = recovery_new(cfg_gets("wal_dir"), false,
replica_clock);
+ if (relay->r == NULL)
+ diag_raise();
vclock_copy(&relay->tx.vclock, replica_clock);
relay->version_id = replica_version_id;
--
2.25.0
More information about the Tarantool-patches
mailing list