[Tarantool-patches] [PATCH v4 02/11] recovery: do not throw an error

Konstantin Osipov kostja.osipov at gmail.com
Thu Mar 19 10:56:45 MSK 2020


* Georgy Kirichenko <georgy at tarantool.org> [20/02/12 13:09]:
> Relaying from C-written wal requires recovery to be a C-compliant. So
> get rid of exception from recovery interface.

LGTM, but please solicit another review.

Let's not cook this any longer, Sergey, it would be really great
if you finish this patch and push it.

Thanks!
> 
> Part of #980
> ---
>  src/box/box.cc      | 19 ++++++++--
>  src/box/recovery.cc | 89 ++++++++++++++++++++++++++-------------------
>  src/box/recovery.h  | 14 +++----
>  src/box/relay.cc    | 15 ++++----
>  4 files changed, 82 insertions(+), 55 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 68038df18..611100b8b 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -2166,6 +2166,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	recovery = recovery_new(cfg_gets("wal_dir"),
>  				cfg_geti("force_recovery"),
>  				checkpoint_vclock);
> +	if (recovery == NULL)
> +		diag_raise();
>  
>  	/*
>  	 * Make sure we report the actual recovery position
> @@ -2183,7 +2185,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	 * so we must reflect this in replicaset vclock to
>  	 * not attempt to apply these rows twice.
>  	 */
> -	recovery_scan(recovery, &replicaset.vclock, &gc.vclock);
> +	if (recovery_scan(recovery, &replicaset.vclock, &gc.vclock) != 0)
> +		diag_raise();
>  	say_info("instance vclock %s", vclock_to_string(&replicaset.vclock));
>  
>  	if (wal_dir_lock >= 0) {
> @@ -2226,7 +2229,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
>  
>  	engine_begin_final_recovery_xc();
> -	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
> +	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
> +		diag_raise();
>  	engine_end_recovery_xc();
>  	/*
>  	 * Leave hot standby mode, if any, only after
> @@ -2239,6 +2243,10 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  				      cfg_getd("wal_dir_rescan_delay"));
>  		while (true) {
>  			if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock)) {
> +				/*
> +				 * Let recovery_stop_local override
> +				 * a path_lock error.
> +				 */
>  				recovery_stop_local(recovery);
>  				diag_raise();
>  			}
> @@ -2246,8 +2254,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  				break;
>  			fiber_sleep(0.1);
>  		}
> -		recovery_stop_local(recovery);
> -		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
> +		if (recovery_stop_local(recovery) != 0)
> +			diag_raise();
> +		if (recover_remaining_wals(recovery, &wal_stream.base, NULL,
> +					   true) != 0)
> +			diag_raise();
>  		/*
>  		 * Advance replica set vclock to reflect records
>  		 * applied in hot standby mode.
> diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> index a1ac2d967..e4aad1296 100644
> --- a/src/box/recovery.cc
> +++ b/src/box/recovery.cc
> @@ -87,14 +87,11 @@ recovery_new(const char *wal_dirname, bool force_recovery,
>  			calloc(1, sizeof(*r));
>  
>  	if (r == NULL) {
> -		tnt_raise(OutOfMemory, sizeof(*r), "malloc",
> -			  "struct recovery");
> +		diag_set(OutOfMemory, sizeof(*r), "malloc",
> +			 "struct recovery");
> +		return NULL;
>  	}
>  
> -	auto guard = make_scoped_guard([=]{
> -		free(r);
> -	});
> -
>  	xdir_create(&r->wal_dir, wal_dirname, XLOG, &INSTANCE_UUID,
>  		    &xlog_opts_default);
>  	r->wal_dir.force_recovery = force_recovery;
> @@ -108,27 +105,31 @@ recovery_new(const char *wal_dirname, bool force_recovery,
>  	 * UUID, see replication/cluster.test for
>  	 * details.
>  	 */
> -	xdir_check_xc(&r->wal_dir);
> +	if (xdir_check(&r->wal_dir) != 0) {
> +		xdir_destroy(&r->wal_dir);
> +		free(r);
> +		return NULL;
> +	}
>  
>  	r->watcher = NULL;
>  	rlist_create(&r->on_close_log);
>  
> -	guard.is_active = false;
>  	return r;
>  }
>  
> -void
> +int
>  recovery_scan(struct recovery *r, struct vclock *end_vclock,
>  	      struct vclock *gc_vclock)
>  {
> -	xdir_scan_xc(&r->wal_dir);
> +	if (xdir_scan(&r->wal_dir) != 0)
> +		return -1;
>  
>  	if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 ||
>  	    vclock_compare(end_vclock, &r->vclock) < 0) {
>  		/* No xlogs after last checkpoint. */
>  		vclock_copy(gc_vclock, &r->vclock);
>  		vclock_copy(end_vclock, &r->vclock);
> -		return;
> +		return 0;
>  	}
>  
>  	if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0)
> @@ -137,11 +138,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock,
>  	/* Scan the last xlog to find end vclock. */
>  	struct xlog_cursor cursor;
>  	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
> -		return;
> +		return 0;
>  	struct xrow_header row;
>  	while (xlog_cursor_next(&cursor, &row, true) == 0)
>  		vclock_follow_xrow(end_vclock, &row);
>  	xlog_cursor_close(&cursor, false);
> +	return 0;
>  }
>  
>  static inline void
> @@ -156,19 +158,21 @@ recovery_close_log(struct recovery *r)
>  			 r->cursor.name);
>  	}
>  	xlog_cursor_close(&r->cursor, false);
> -	trigger_run_xc(&r->on_close_log, NULL);
> +	/* Suppress a trigger error if happened. */
> +	trigger_run(&r->on_close_log, NULL);
>  }
>  
> -static void
> +static int
>  recovery_open_log(struct recovery *r, const struct vclock *vclock)
>  {
> -	XlogGapError *e;
>  	struct xlog_meta meta = r->cursor.meta;
>  	enum xlog_cursor_state state = r->cursor.state;
>  
>  	recovery_close_log(r);
>  
> -	xdir_open_cursor_xc(&r->wal_dir, vclock_sum(vclock), &r->cursor);
> +	if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock),
> +			     &r->cursor) != 0)
> +		return -1;
>  
>  	if (state == XLOG_CURSOR_NEW &&
>  	    vclock_compare(vclock, &r->vclock) > 0) {
> @@ -201,14 +205,14 @@ out:
>  	 */
>  	if (vclock_compare(&r->vclock, vclock) < 0)
>  		vclock_copy(&r->vclock, vclock);
> -	return;
> +	return 0;
>  
>  gap_error:
> -	e = tnt_error(XlogGapError, &r->vclock, vclock);
> +	diag_set(XlogGapError, &r->vclock, vclock);
>  	if (!r->wal_dir.force_recovery)
> -		throw e;
> +		return -1;
>  	/* Ignore missing WALs if force_recovery is set. */
> -	e->log();
> +	diag_log();
>  	say_warn("ignoring a gap in LSN");
>  	goto out;
>  }
> @@ -217,7 +221,6 @@ void
>  recovery_delete(struct recovery *r)
>  {
>  	assert(r->watcher == NULL);
> -
>  	trigger_destroy(&r->on_close_log);
>  	xdir_destroy(&r->wal_dir);
>  	if (xlog_cursor_is_open(&r->cursor)) {
> @@ -237,25 +240,26 @@ recovery_delete(struct recovery *r)
>   * The reading will be stopped on reaching stop_vclock.
>   * Use NULL for boundless recover
>   */
> -static void
> +static int
>  recover_xlog(struct recovery *r, struct xstream *stream,
>  	     const struct vclock *stop_vclock)
>  {
>  	struct xrow_header row;
>  	uint64_t row_count = 0;
> -	while (xlog_cursor_next_xc(&r->cursor, &row,
> -				   r->wal_dir.force_recovery) == 0) {
> +	int rc;
> +	while ((rc = xlog_cursor_next(&r->cursor, &row,
> +				      r->wal_dir.force_recovery)) == 0) {
>  		/*
>  		 * Read the next row from xlog file.
>  		 *
> -		 * xlog_cursor_next_xc() returns 1 when
> +		 * xlog_cursor_next() returns 1 when
>  		 * it can not read more rows. This doesn't mean
>  		 * the file is fully read: it's fully read only
>  		 * when EOF marker has been read, see i.eof_read
>  		 */
>  		if (stop_vclock != NULL &&
>  		    r->vclock.signature >= stop_vclock->signature)
> -			return;
> +			return 0;
>  		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
>  		if (row.lsn <= current_lsn)
>  			continue; /* already applied, skip */
> @@ -282,13 +286,16 @@ recover_xlog(struct recovery *r, struct xstream *stream,
>  					 row_count / 1000000.);
>  		} else {
>  			if (!r->wal_dir.force_recovery)
> -				diag_raise();
> +				return -1;
>  
>  			say_error("skipping row {%u: %lld}",
>  				  (unsigned)row.replica_id, (long long)row.lsn);
>  			diag_log();
>  		}
>  	}
> +	if (rc < 0)
> +		return -1;
> +	return 0;
>  }
>  
>  /**
> @@ -302,14 +309,14 @@ recover_xlog(struct recovery *r, struct xstream *stream,
>   * This function will not close r->current_wal if
>   * recovery was successful.
>   */
> -void
> +int
>  recover_remaining_wals(struct recovery *r, struct xstream *stream,
>  		       const struct vclock *stop_vclock, bool scan_dir)
>  {
>  	struct vclock *clock;
>  
> -	if (scan_dir)
> -		xdir_scan_xc(&r->wal_dir);
> +	if (scan_dir && xdir_scan(&r->wal_dir) != 0)
> +		return -1;
>  
>  	if (xlog_cursor_is_open(&r->cursor)) {
>  		/* If there's a WAL open, recover from it first. */
> @@ -343,21 +350,26 @@ recover_remaining_wals(struct recovery *r, struct xstream *stream,
>  			continue;
>  		}
>  
> -		recovery_open_log(r, clock);
> +		if (recovery_open_log(r, clock) != 0)
> +			return -1;
>  
>  		say_info("recover from `%s'", r->cursor.name);
>  
>  recover_current_wal:
> -		recover_xlog(r, stream, stop_vclock);
> +		if (recover_xlog(r, stream, stop_vclock) != 0)
> +			return -1;
>  	}
>  
>  	if (xlog_cursor_is_eof(&r->cursor))
>  		recovery_close_log(r);
>  
> -	if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0)
> -		tnt_raise(XlogGapError, &r->vclock, stop_vclock);
> +	if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) {
> +		diag_set(XlogGapError, &r->vclock, stop_vclock);
> +		return -1;
> +	}
>  
>  	region_free(&fiber()->gc);
> +	return 0;
>  }
>  
>  void
> @@ -481,7 +493,9 @@ hot_standby_f(va_list ap)
>  		do {
>  			start = vclock_sum(&r->vclock);
>  
> -			recover_remaining_wals(r, stream, NULL, scan_dir);
> +			if (recover_remaining_wals(r, stream, NULL,
> +						   scan_dir) != 0)
> +				diag_raise();
>  
>  			end = vclock_sum(&r->vclock);
>  			/*
> @@ -529,7 +543,7 @@ recovery_follow_local(struct recovery *r, struct xstream *stream,
>  	fiber_start(r->watcher, r, stream, wal_dir_rescan_delay);
>  }
>  
> -void
> +int
>  recovery_stop_local(struct recovery *r)
>  {
>  	if (r->watcher) {
> @@ -537,8 +551,9 @@ recovery_stop_local(struct recovery *r)
>  		r->watcher = NULL;
>  		fiber_cancel(f);
>  		if (fiber_join(f) != 0)
> -			diag_raise();
> +			return -1;
>  	}
> +	return 0;
>  }
>  
>  /* }}} */
> diff --git a/src/box/recovery.h b/src/box/recovery.h
> index 6e68abc0b..145d9199e 100644
> --- a/src/box/recovery.h
> +++ b/src/box/recovery.h
> @@ -74,7 +74,7 @@ recovery_delete(struct recovery *r);
>   * @gc_vclock is set to the oldest vclock available in the
>   * WAL directory.
>   */
> -void
> +int
>  recovery_scan(struct recovery *r,  struct vclock *end_vclock,
>  	      struct vclock *gc_vclock);
>  
> @@ -82,16 +82,12 @@ void
>  recovery_follow_local(struct recovery *r, struct xstream *stream,
>  		      const char *name, ev_tstamp wal_dir_rescan_delay);
>  
> -void
> +int
>  recovery_stop_local(struct recovery *r);
>  
>  void
>  recovery_finalize(struct recovery *r);
>  
> -#if defined(__cplusplus)
> -} /* extern "C" */
> -#endif /* defined(__cplusplus) */
> -
>  /**
>   * Find out if there are new .xlog files since the current
>   * vclock, and read them all up.
> @@ -102,8 +98,12 @@ recovery_finalize(struct recovery *r);
>   * This function will not close r->current_wal if
>   * recovery was successful.
>   */
> -void
> +int
>  recover_remaining_wals(struct recovery *r, struct xstream *stream,
>  		       const struct vclock *stop_vclock, bool scan_dir);
>  
> +#if defined(__cplusplus)
> +} /* extern "C" */
> +#endif /* defined(__cplusplus) */
> +
>  #endif /* TARANTOOL_RECOVERY_H_INCLUDED */
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index b89632273..d5a1c9c68 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -334,8 +334,9 @@ relay_final_join_f(va_list ap)
>  
>  	/* Send all WALs until stop_vclock */
>  	assert(relay->stream.write != NULL);
> -	recover_remaining_wals(relay->r, &relay->stream,
> -			       &relay->stop_vclock, true);
> +	if (recover_remaining_wals(relay->r, &relay->stream,
> +				   &relay->stop_vclock, true) != 0)
> +		diag_raise();
>  	assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
>  	return 0;
>  }
> @@ -491,11 +492,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
>  		 */
>  		return;
>  	}
> -	try {
> -		recover_remaining_wals(relay->r, &relay->stream, NULL,
> -				       (events & WAL_EVENT_ROTATE) != 0);
> -	} catch (Exception *e) {
> -		relay_set_error(relay, e);
> +	if (recover_remaining_wals(relay->r, &relay->stream, NULL,
> +				   (events & WAL_EVENT_ROTATE) != 0) != 0) {
> +		relay_set_error(relay, diag_last_error(diag_get()));
>  		fiber_cancel(fiber());
>  	}
>  }
> @@ -702,6 +701,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>  	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
>  	relay->r = recovery_new(cfg_gets("wal_dir"), false,
>  			        replica_clock);
> +	if (relay->r == NULL)
> +		diag_raise();
>  	vclock_copy(&relay->tx.vclock, replica_clock);
>  	relay->version_id = replica_version_id;
>  
> -- 
> 2.25.0

-- 
Konstantin Osipov, Moscow, Russia


More information about the Tarantool-patches mailing list