[Tarantool-patches] [PATCH 1/6] recovery: do not throw an error

Georgy Kirichenko georgy at tarantool.org
Tue Nov 19 19:04:52 MSK 2019


Relaying from C-written wal requires recovery to be a C-compliant. So
get rid of exception from recovery interface.

Part of #980
---
 src/box/box.cc      | 16 ++++++---
 src/box/recovery.cc | 87 +++++++++++++++++++++++++++------------------
 src/box/recovery.h  | 14 ++++----
 src/box/relay.cc    | 15 ++++----
 4 files changed, 79 insertions(+), 53 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index b119c927b..a53b6e912 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1903,6 +1903,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	recovery = recovery_new(cfg_gets("wal_dir"),
 				cfg_geti("force_recovery"),
 				checkpoint_vclock);
+	if (recovery == NULL)
+		diag_raise();
 
 	/*
 	 * Make sure we report the actual recovery position
@@ -1911,6 +1913,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	box_vclock = &recovery->vclock;
 	auto guard = make_scoped_guard([&]{
 		box_vclock = &replicaset.vclock;
+		recovery_stop_local(recovery);
 		recovery_delete(recovery);
 	});
 
@@ -1920,7 +1923,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 * so we must reflect this in replicaset vclock to
 	 * not attempt to apply these rows twice.
 	 */
-	recovery_scan(recovery, &replicaset.vclock, &gc.vclock);
+	if (recovery_scan(recovery, &replicaset.vclock, &gc.vclock) != 0)
+		diag_raise();
 	say_info("instance vclock %s", vclock_to_string(&replicaset.vclock));
 
 	if (wal_dir_lock >= 0) {
@@ -1963,7 +1967,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
 
 	engine_begin_final_recovery_xc();
-	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
+	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
+		diag_raise();
 	engine_end_recovery_xc();
 	/*
 	 * Leave hot standby mode, if any, only after
@@ -1981,8 +1986,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
 				break;
 			fiber_sleep(0.1);
 		}
-		recovery_stop_local(recovery);
-		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
+		if (recovery_stop_local(recovery) != 0)
+			diag_raise();
+		if (recover_remaining_wals(recovery, &wal_stream.base, NULL,
+					   true) != 0)
+			diag_raise();
 		/*
 		 * Advance replica set vclock to reflect records
 		 * applied in hot standby mode.
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index d122d618a..4693008f1 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -87,14 +87,11 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 			calloc(1, sizeof(*r));
 
 	if (r == NULL) {
-		tnt_raise(OutOfMemory, sizeof(*r), "malloc",
-			  "struct recovery");
+		diag_set(OutOfMemory, sizeof(*r), "malloc",
+			 "struct recovery");
+		return NULL;
 	}
 
-	auto guard = make_scoped_guard([=]{
-		free(r);
-	});
-
 	xdir_create(&r->wal_dir, wal_dirname, XLOG, &INSTANCE_UUID,
 		    &xlog_opts_default);
 	r->wal_dir.force_recovery = force_recovery;
@@ -108,27 +105,31 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 	 * UUID, see replication/cluster.test for
 	 * details.
 	 */
-	xdir_check_xc(&r->wal_dir);
+	if (xdir_check(&r->wal_dir) != 0) {
+		xdir_destroy(&r->wal_dir);
+		free(r);
+		return NULL;
+	}
 
 	r->watcher = NULL;
 	rlist_create(&r->on_close_log);
 
-	guard.is_active = false;
 	return r;
 }
 
-void
+int
 recovery_scan(struct recovery *r, struct vclock *end_vclock,
 	      struct vclock *gc_vclock)
 {
-	xdir_scan_xc(&r->wal_dir);
+	if (xdir_scan(&r->wal_dir) != 0)
+		return -1;
 
 	if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 ||
 	    vclock_compare(end_vclock, &r->vclock) < 0) {
 		/* No xlogs after last checkpoint. */
 		vclock_copy(gc_vclock, &r->vclock);
 		vclock_copy(end_vclock, &r->vclock);
-		return;
+		return 0;
 	}
 
 	if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0)
@@ -137,11 +138,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock,
 	/* Scan the last xlog to find end vclock. */
 	struct xlog_cursor cursor;
 	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
-		return;
+		return 0;
 	struct xrow_header row;
 	while (xlog_cursor_next(&cursor, &row, true) == 0)
 		vclock_follow_xrow(end_vclock, &row);
 	xlog_cursor_close(&cursor, false);
+	return 0;
 }
 
 static inline void
@@ -156,19 +158,21 @@ recovery_close_log(struct recovery *r)
 			 r->cursor.name);
 	}
 	xlog_cursor_close(&r->cursor, false);
-	trigger_run_xc(&r->on_close_log, NULL);
+	/* Suppress a trigger error if happened. */
+	trigger_run(&r->on_close_log, NULL);
 }
 
-static void
+static int
 recovery_open_log(struct recovery *r, const struct vclock *vclock)
 {
-	XlogGapError *e;
 	struct xlog_meta meta = r->cursor.meta;
 	enum xlog_cursor_state state = r->cursor.state;
 
 	recovery_close_log(r);
 
-	xdir_open_cursor_xc(&r->wal_dir, vclock_sum(vclock), &r->cursor);
+	if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock),
+			     &r->cursor) != 0)
+		return -1;
 
 	if (state == XLOG_CURSOR_NEW &&
 	    vclock_compare(vclock, &r->vclock) > 0) {
@@ -201,14 +205,14 @@ out:
 	 */
 	if (vclock_compare(&r->vclock, vclock) < 0)
 		vclock_copy(&r->vclock, vclock);
-	return;
+	return 0;
 
 gap_error:
-	e = tnt_error(XlogGapError, &r->vclock, vclock);
+	diag_set(XlogGapError, &r->vclock, vclock);
 	if (!r->wal_dir.force_recovery)
-		throw e;
+		return -1;
 	/* Ignore missing WALs if force_recovery is set. */
-	e->log();
+	diag_log();
 	say_warn("ignoring a gap in LSN");
 	goto out;
 }
@@ -216,8 +220,9 @@ gap_error:
 void
 recovery_delete(struct recovery *r)
 {
-	recovery_stop_local(r);
+	/* Recovery should be stopped before deleting. */
 
+	assert(r->watcher == NULL);
 	trigger_destroy(&r->on_close_log);
 	xdir_destroy(&r->wal_dir);
 	if (xlog_cursor_is_open(&r->cursor)) {
@@ -237,25 +242,26 @@ recovery_delete(struct recovery *r)
  * The reading will be stopped on reaching stop_vclock.
  * Use NULL for boundless recover
  */
-static void
+static int
 recover_xlog(struct recovery *r, struct xstream *stream,
 	     const struct vclock *stop_vclock)
 {
 	struct xrow_header row;
 	uint64_t row_count = 0;
-	while (xlog_cursor_next_xc(&r->cursor, &row,
-				   r->wal_dir.force_recovery) == 0) {
+	int rc;
+	while ((rc = xlog_cursor_next(&r->cursor, &row,
+				      r->wal_dir.force_recovery)) == 0) {
 		/*
 		 * Read the next row from xlog file.
 		 *
-		 * xlog_cursor_next_xc() returns 1 when
+		 * xlog_cursor_next() returns 1 when
 		 * it can not read more rows. This doesn't mean
 		 * the file is fully read: it's fully read only
 		 * when EOF marker has been read, see i.eof_read
 		 */
 		if (stop_vclock != NULL &&
 		    r->vclock.signature >= stop_vclock->signature)
-			return;
+			return 0;
 		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
 		if (row.lsn <= current_lsn)
 			continue; /* already applied, skip */
@@ -279,13 +285,16 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 					 row_count / 1000000.);
 		} else {
 			if (!r->wal_dir.force_recovery)
-				diag_raise();
+				return -1;
 
 			say_error("skipping row {%u: %lld}",
 				  (unsigned)row.replica_id, (long long)row.lsn);
 			diag_log();
 		}
 	}
+	if (rc < 0)
+		return -1;
+	return 0;
 }
 
 /**
@@ -299,7 +308,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
  * This function will not close r->current_wal if
  * recovery was successful.
  */
-void
+int
 recover_remaining_wals(struct recovery *r, struct xstream *stream,
 		       const struct vclock *stop_vclock, bool scan_dir)
 {
@@ -340,21 +349,26 @@ recover_remaining_wals(struct recovery *r, struct xstream *stream,
 			continue;
 		}
 
-		recovery_open_log(r, clock);
+		if (recovery_open_log(r, clock) != 0)
+			return -1;
 
 		say_info("recover from `%s'", r->cursor.name);
 
 recover_current_wal:
-		recover_xlog(r, stream, stop_vclock);
+		if (recover_xlog(r, stream, stop_vclock) != 0)
+			return -1;
 	}
 
 	if (xlog_cursor_is_eof(&r->cursor))
 		recovery_close_log(r);
 
-	if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0)
-		tnt_raise(XlogGapError, &r->vclock, stop_vclock);
+	if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) {
+		diag_set(XlogGapError, &r->vclock, stop_vclock);
+		return -1;
+	}
 
 	region_free(&fiber()->gc);
+	return 0;
 }
 
 void
@@ -478,7 +492,9 @@ hot_standby_f(va_list ap)
 		do {
 			start = vclock_sum(&r->vclock);
 
-			recover_remaining_wals(r, stream, NULL, scan_dir);
+			if (recover_remaining_wals(r, stream, NULL,
+						   scan_dir) != 0)
+				diag_raise();
 
 			end = vclock_sum(&r->vclock);
 			/*
@@ -526,7 +542,7 @@ recovery_follow_local(struct recovery *r, struct xstream *stream,
 	fiber_start(r->watcher, r, stream, wal_dir_rescan_delay);
 }
 
-void
+int
 recovery_stop_local(struct recovery *r)
 {
 	if (r->watcher) {
@@ -534,8 +550,9 @@ recovery_stop_local(struct recovery *r)
 		r->watcher = NULL;
 		fiber_cancel(f);
 		if (fiber_join(f) != 0)
-			diag_raise();
+			return -1;
 	}
+	return 0;
 }
 
 /* }}} */
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 6e68abc0b..145d9199e 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -74,7 +74,7 @@ recovery_delete(struct recovery *r);
  * @gc_vclock is set to the oldest vclock available in the
  * WAL directory.
  */
-void
+int
 recovery_scan(struct recovery *r,  struct vclock *end_vclock,
 	      struct vclock *gc_vclock);
 
@@ -82,16 +82,12 @@ void
 recovery_follow_local(struct recovery *r, struct xstream *stream,
 		      const char *name, ev_tstamp wal_dir_rescan_delay);
 
-void
+int
 recovery_stop_local(struct recovery *r);
 
 void
 recovery_finalize(struct recovery *r);
 
-#if defined(__cplusplus)
-} /* extern "C" */
-#endif /* defined(__cplusplus) */
-
 /**
  * Find out if there are new .xlog files since the current
  * vclock, and read them all up.
@@ -102,8 +98,12 @@ recovery_finalize(struct recovery *r);
  * This function will not close r->current_wal if
  * recovery was successful.
  */
-void
+int
 recover_remaining_wals(struct recovery *r, struct xstream *stream,
 		       const struct vclock *stop_vclock, bool scan_dir);
 
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
 #endif /* TARANTOOL_RECOVERY_H_INCLUDED */
diff --git a/src/box/relay.cc b/src/box/relay.cc
index e849fcf4f..5c2b0067e 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -335,8 +335,9 @@ relay_final_join_f(va_list ap)
 
 	/* Send all WALs until stop_vclock */
 	assert(relay->stream.write != NULL);
-	recover_remaining_wals(relay->r, &relay->stream,
-			       &relay->stop_vclock, true);
+	if (recover_remaining_wals(relay->r, &relay->stream,
+				   &relay->stop_vclock, true) != 0)
+		diag_raise();
 	assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
 	return 0;
 }
@@ -492,11 +493,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 		 */
 		return;
 	}
-	try {
-		recover_remaining_wals(relay->r, &relay->stream, NULL,
-				       (events & WAL_EVENT_ROTATE) != 0);
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
+	if (recover_remaining_wals(relay->r, &relay->stream, NULL,
+				   (events & WAL_EVENT_ROTATE) != 0) != 0) {
+		relay_set_error(relay, diag_last_error(diag_get()));
 		fiber_cancel(fiber());
 	}
 }
@@ -697,6 +696,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
 	relay->r = recovery_new(cfg_gets("wal_dir"), false,
 			        replica_clock);
+	if (relay->r == NULL)
+		diag_raise();
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
-- 
2.24.0



More information about the Tarantool-patches mailing list