Tarantool development patches archive
 help / color / mirror / Atom feed
From: Konstantin Osipov <kostja.osipov@gmail.com>
To: Georgy Kirichenko <georgy@tarantool.org>, sergepetrenko@tarantool.org
Cc: tarantool-patches@dev.tarantool.org
Subject: Re: [Tarantool-patches] [PATCH v4 02/11] recovery: do not throw an error
Date: Thu, 19 Mar 2020 10:56:45 +0300	[thread overview]
Message-ID: <20200319075645.GC3227@atlas> (raw)
In-Reply-To: <5cf963da3d429fdf99120295c9131fd439c70229.1581500169.git.georgy@tarantool.org>

* Georgy Kirichenko <georgy@tarantool.org> [20/02/12 13:09]:
> Relaying from C-written wal requires recovery to be a C-compliant. So
> get rid of exception from recovery interface.

LGTM, but please solicit another review.

Let's not cook this any longer, Sergey, it would be really great
if you finish this patch and push it.

Thanks!
> 
> Part of #980
> ---
>  src/box/box.cc      | 19 ++++++++--
>  src/box/recovery.cc | 89 ++++++++++++++++++++++++++-------------------
>  src/box/recovery.h  | 14 +++----
>  src/box/relay.cc    | 15 ++++----
>  4 files changed, 82 insertions(+), 55 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 68038df18..611100b8b 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -2166,6 +2166,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	recovery = recovery_new(cfg_gets("wal_dir"),
>  				cfg_geti("force_recovery"),
>  				checkpoint_vclock);
> +	if (recovery == NULL)
> +		diag_raise();
>  
>  	/*
>  	 * Make sure we report the actual recovery position
> @@ -2183,7 +2185,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	 * so we must reflect this in replicaset vclock to
>  	 * not attempt to apply these rows twice.
>  	 */
> -	recovery_scan(recovery, &replicaset.vclock, &gc.vclock);
> +	if (recovery_scan(recovery, &replicaset.vclock, &gc.vclock) != 0)
> +		diag_raise();
>  	say_info("instance vclock %s", vclock_to_string(&replicaset.vclock));
>  
>  	if (wal_dir_lock >= 0) {
> @@ -2226,7 +2229,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
>  
>  	engine_begin_final_recovery_xc();
> -	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
> +	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
> +		diag_raise();
>  	engine_end_recovery_xc();
>  	/*
>  	 * Leave hot standby mode, if any, only after
> @@ -2239,6 +2243,10 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  				      cfg_getd("wal_dir_rescan_delay"));
>  		while (true) {
>  			if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock)) {
> +				/*
> +				 * Let recovery_stop_local override
> +				 * a path_lock error.
> +				 */
>  				recovery_stop_local(recovery);
>  				diag_raise();
>  			}
> @@ -2246,8 +2254,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  				break;
>  			fiber_sleep(0.1);
>  		}
> -		recovery_stop_local(recovery);
> -		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
> +		if (recovery_stop_local(recovery) != 0)
> +			diag_raise();
> +		if (recover_remaining_wals(recovery, &wal_stream.base, NULL,
> +					   true) != 0)
> +			diag_raise();
>  		/*
>  		 * Advance replica set vclock to reflect records
>  		 * applied in hot standby mode.
> diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> index a1ac2d967..e4aad1296 100644
> --- a/src/box/recovery.cc
> +++ b/src/box/recovery.cc
> @@ -87,14 +87,11 @@ recovery_new(const char *wal_dirname, bool force_recovery,
>  			calloc(1, sizeof(*r));
>  
>  	if (r == NULL) {
> -		tnt_raise(OutOfMemory, sizeof(*r), "malloc",
> -			  "struct recovery");
> +		diag_set(OutOfMemory, sizeof(*r), "malloc",
> +			 "struct recovery");
> +		return NULL;
>  	}
>  
> -	auto guard = make_scoped_guard([=]{
> -		free(r);
> -	});
> -
>  	xdir_create(&r->wal_dir, wal_dirname, XLOG, &INSTANCE_UUID,
>  		    &xlog_opts_default);
>  	r->wal_dir.force_recovery = force_recovery;
> @@ -108,27 +105,31 @@ recovery_new(const char *wal_dirname, bool force_recovery,
>  	 * UUID, see replication/cluster.test for
>  	 * details.
>  	 */
> -	xdir_check_xc(&r->wal_dir);
> +	if (xdir_check(&r->wal_dir) != 0) {
> +		xdir_destroy(&r->wal_dir);
> +		free(r);
> +		return NULL;
> +	}
>  
>  	r->watcher = NULL;
>  	rlist_create(&r->on_close_log);
>  
> -	guard.is_active = false;
>  	return r;
>  }
>  
> -void
> +int
>  recovery_scan(struct recovery *r, struct vclock *end_vclock,
>  	      struct vclock *gc_vclock)
>  {
> -	xdir_scan_xc(&r->wal_dir);
> +	if (xdir_scan(&r->wal_dir) != 0)
> +		return -1;
>  
>  	if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 ||
>  	    vclock_compare(end_vclock, &r->vclock) < 0) {
>  		/* No xlogs after last checkpoint. */
>  		vclock_copy(gc_vclock, &r->vclock);
>  		vclock_copy(end_vclock, &r->vclock);
> -		return;
> +		return 0;
>  	}
>  
>  	if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0)
> @@ -137,11 +138,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock,
>  	/* Scan the last xlog to find end vclock. */
>  	struct xlog_cursor cursor;
>  	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
> -		return;
> +		return 0;
>  	struct xrow_header row;
>  	while (xlog_cursor_next(&cursor, &row, true) == 0)
>  		vclock_follow_xrow(end_vclock, &row);
>  	xlog_cursor_close(&cursor, false);
> +	return 0;
>  }
>  
>  static inline void
> @@ -156,19 +158,21 @@ recovery_close_log(struct recovery *r)
>  			 r->cursor.name);
>  	}
>  	xlog_cursor_close(&r->cursor, false);
> -	trigger_run_xc(&r->on_close_log, NULL);
> +	/* Suppress a trigger error if happened. */
> +	trigger_run(&r->on_close_log, NULL);
>  }
>  
> -static void
> +static int
>  recovery_open_log(struct recovery *r, const struct vclock *vclock)
>  {
> -	XlogGapError *e;
>  	struct xlog_meta meta = r->cursor.meta;
>  	enum xlog_cursor_state state = r->cursor.state;
>  
>  	recovery_close_log(r);
>  
> -	xdir_open_cursor_xc(&r->wal_dir, vclock_sum(vclock), &r->cursor);
> +	if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock),
> +			     &r->cursor) != 0)
> +		return -1;
>  
>  	if (state == XLOG_CURSOR_NEW &&
>  	    vclock_compare(vclock, &r->vclock) > 0) {
> @@ -201,14 +205,14 @@ out:
>  	 */
>  	if (vclock_compare(&r->vclock, vclock) < 0)
>  		vclock_copy(&r->vclock, vclock);
> -	return;
> +	return 0;
>  
>  gap_error:
> -	e = tnt_error(XlogGapError, &r->vclock, vclock);
> +	diag_set(XlogGapError, &r->vclock, vclock);
>  	if (!r->wal_dir.force_recovery)
> -		throw e;
> +		return -1;
>  	/* Ignore missing WALs if force_recovery is set. */
> -	e->log();
> +	diag_log();
>  	say_warn("ignoring a gap in LSN");
>  	goto out;
>  }
> @@ -217,7 +221,6 @@ void
>  recovery_delete(struct recovery *r)
>  {
>  	assert(r->watcher == NULL);
> -
>  	trigger_destroy(&r->on_close_log);
>  	xdir_destroy(&r->wal_dir);
>  	if (xlog_cursor_is_open(&r->cursor)) {
> @@ -237,25 +240,26 @@ recovery_delete(struct recovery *r)
>   * The reading will be stopped on reaching stop_vclock.
>   * Use NULL for boundless recover
>   */
> -static void
> +static int
>  recover_xlog(struct recovery *r, struct xstream *stream,
>  	     const struct vclock *stop_vclock)
>  {
>  	struct xrow_header row;
>  	uint64_t row_count = 0;
> -	while (xlog_cursor_next_xc(&r->cursor, &row,
> -				   r->wal_dir.force_recovery) == 0) {
> +	int rc;
> +	while ((rc = xlog_cursor_next(&r->cursor, &row,
> +				      r->wal_dir.force_recovery)) == 0) {
>  		/*
>  		 * Read the next row from xlog file.
>  		 *
> -		 * xlog_cursor_next_xc() returns 1 when
> +		 * xlog_cursor_next() returns 1 when
>  		 * it can not read more rows. This doesn't mean
>  		 * the file is fully read: it's fully read only
>  		 * when EOF marker has been read, see i.eof_read
>  		 */
>  		if (stop_vclock != NULL &&
>  		    r->vclock.signature >= stop_vclock->signature)
> -			return;
> +			return 0;
>  		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
>  		if (row.lsn <= current_lsn)
>  			continue; /* already applied, skip */
> @@ -282,13 +286,16 @@ recover_xlog(struct recovery *r, struct xstream *stream,
>  					 row_count / 1000000.);
>  		} else {
>  			if (!r->wal_dir.force_recovery)
> -				diag_raise();
> +				return -1;
>  
>  			say_error("skipping row {%u: %lld}",
>  				  (unsigned)row.replica_id, (long long)row.lsn);
>  			diag_log();
>  		}
>  	}
> +	if (rc < 0)
> +		return -1;
> +	return 0;
>  }
>  
>  /**
> @@ -302,14 +309,14 @@ recover_xlog(struct recovery *r, struct xstream *stream,
>   * This function will not close r->current_wal if
>   * recovery was successful.
>   */
> -void
> +int
>  recover_remaining_wals(struct recovery *r, struct xstream *stream,
>  		       const struct vclock *stop_vclock, bool scan_dir)
>  {
>  	struct vclock *clock;
>  
> -	if (scan_dir)
> -		xdir_scan_xc(&r->wal_dir);
> +	if (scan_dir && xdir_scan(&r->wal_dir) != 0)
> +		return -1;
>  
>  	if (xlog_cursor_is_open(&r->cursor)) {
>  		/* If there's a WAL open, recover from it first. */
> @@ -343,21 +350,26 @@ recover_remaining_wals(struct recovery *r, struct xstream *stream,
>  			continue;
>  		}
>  
> -		recovery_open_log(r, clock);
> +		if (recovery_open_log(r, clock) != 0)
> +			return -1;
>  
>  		say_info("recover from `%s'", r->cursor.name);
>  
>  recover_current_wal:
> -		recover_xlog(r, stream, stop_vclock);
> +		if (recover_xlog(r, stream, stop_vclock) != 0)
> +			return -1;
>  	}
>  
>  	if (xlog_cursor_is_eof(&r->cursor))
>  		recovery_close_log(r);
>  
> -	if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0)
> -		tnt_raise(XlogGapError, &r->vclock, stop_vclock);
> +	if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) {
> +		diag_set(XlogGapError, &r->vclock, stop_vclock);
> +		return -1;
> +	}
>  
>  	region_free(&fiber()->gc);
> +	return 0;
>  }
>  
>  void
> @@ -481,7 +493,9 @@ hot_standby_f(va_list ap)
>  		do {
>  			start = vclock_sum(&r->vclock);
>  
> -			recover_remaining_wals(r, stream, NULL, scan_dir);
> +			if (recover_remaining_wals(r, stream, NULL,
> +						   scan_dir) != 0)
> +				diag_raise();
>  
>  			end = vclock_sum(&r->vclock);
>  			/*
> @@ -529,7 +543,7 @@ recovery_follow_local(struct recovery *r, struct xstream *stream,
>  	fiber_start(r->watcher, r, stream, wal_dir_rescan_delay);
>  }
>  
> -void
> +int
>  recovery_stop_local(struct recovery *r)
>  {
>  	if (r->watcher) {
> @@ -537,8 +551,9 @@ recovery_stop_local(struct recovery *r)
>  		r->watcher = NULL;
>  		fiber_cancel(f);
>  		if (fiber_join(f) != 0)
> -			diag_raise();
> +			return -1;
>  	}
> +	return 0;
>  }
>  
>  /* }}} */
> diff --git a/src/box/recovery.h b/src/box/recovery.h
> index 6e68abc0b..145d9199e 100644
> --- a/src/box/recovery.h
> +++ b/src/box/recovery.h
> @@ -74,7 +74,7 @@ recovery_delete(struct recovery *r);
>   * @gc_vclock is set to the oldest vclock available in the
>   * WAL directory.
>   */
> -void
> +int
>  recovery_scan(struct recovery *r,  struct vclock *end_vclock,
>  	      struct vclock *gc_vclock);
>  
> @@ -82,16 +82,12 @@ void
>  recovery_follow_local(struct recovery *r, struct xstream *stream,
>  		      const char *name, ev_tstamp wal_dir_rescan_delay);
>  
> -void
> +int
>  recovery_stop_local(struct recovery *r);
>  
>  void
>  recovery_finalize(struct recovery *r);
>  
> -#if defined(__cplusplus)
> -} /* extern "C" */
> -#endif /* defined(__cplusplus) */
> -
>  /**
>   * Find out if there are new .xlog files since the current
>   * vclock, and read them all up.
> @@ -102,8 +98,12 @@ recovery_finalize(struct recovery *r);
>   * This function will not close r->current_wal if
>   * recovery was successful.
>   */
> -void
> +int
>  recover_remaining_wals(struct recovery *r, struct xstream *stream,
>  		       const struct vclock *stop_vclock, bool scan_dir);
>  
> +#if defined(__cplusplus)
> +} /* extern "C" */
> +#endif /* defined(__cplusplus) */
> +
>  #endif /* TARANTOOL_RECOVERY_H_INCLUDED */
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index b89632273..d5a1c9c68 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -334,8 +334,9 @@ relay_final_join_f(va_list ap)
>  
>  	/* Send all WALs until stop_vclock */
>  	assert(relay->stream.write != NULL);
> -	recover_remaining_wals(relay->r, &relay->stream,
> -			       &relay->stop_vclock, true);
> +	if (recover_remaining_wals(relay->r, &relay->stream,
> +				   &relay->stop_vclock, true) != 0)
> +		diag_raise();
>  	assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
>  	return 0;
>  }
> @@ -491,11 +492,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
>  		 */
>  		return;
>  	}
> -	try {
> -		recover_remaining_wals(relay->r, &relay->stream, NULL,
> -				       (events & WAL_EVENT_ROTATE) != 0);
> -	} catch (Exception *e) {
> -		relay_set_error(relay, e);
> +	if (recover_remaining_wals(relay->r, &relay->stream, NULL,
> +				   (events & WAL_EVENT_ROTATE) != 0) != 0) {
> +		relay_set_error(relay, diag_last_error(diag_get()));
>  		fiber_cancel(fiber());
>  	}
>  }
> @@ -702,6 +701,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>  	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
>  	relay->r = recovery_new(cfg_gets("wal_dir"), false,
>  			        replica_clock);
> +	if (relay->r == NULL)
> +		diag_raise();
>  	vclock_copy(&relay->tx.vclock, replica_clock);
>  	relay->version_id = replica_version_id;
>  
> -- 
> 2.25.0

-- 
Konstantin Osipov, Moscow, Russia

  reply	other threads:[~2020-03-19  7:56 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-02-12  9:39 [Tarantool-patches] [PATCH v4 00/11] Replication from memory Georgy Kirichenko
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 01/11] recovery: do not call recovery_stop_local inside recovery_delete Georgy Kirichenko
2020-03-19  7:55   ` Konstantin Osipov
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 02/11] recovery: do not throw an error Georgy Kirichenko
2020-03-19  7:56   ` Konstantin Osipov [this message]
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 03/11] coio: do not allow parallel usage of coio Georgy Kirichenko
2020-03-19 18:09   ` Konstantin Osipov
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 04/11] coio: do not throw an error, minor refactoring Georgy Kirichenko
2020-03-23  6:59   ` Konstantin Osipov
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 05/11] xstream: get rid of an exception Georgy Kirichenko
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 06/11] wal: extract log write batch into a separate routine Georgy Kirichenko
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 07/11] wal: matrix clock structure Georgy Kirichenko
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 08/11] wal: track relay vclock and collect logs in wal thread Georgy Kirichenko
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 09/11] wal: xrow memory buffer and cursor Georgy Kirichenko
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 10/11] wal: use a xrow buffer object for entry encoding Georgy Kirichenko
2020-02-12  9:39 ` [Tarantool-patches] [PATCH v4 11/11] replication: use wal memory buffer to fetch rows Georgy Kirichenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200319075645.GC3227@atlas \
    --to=kostja.osipov@gmail.com \
    --cc=georgy@tarantool.org \
    --cc=sergepetrenko@tarantool.org \
    --cc=tarantool-patches@dev.tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v4 02/11] recovery: do not throw an error' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox