From: Konstantin Osipov <kostja.osipov@gmail.com> To: Georgy Kirichenko <georgy@tarantool.org>, sergepetrenko@tarantool.org Cc: tarantool-patches@dev.tarantool.org Subject: Re: [Tarantool-patches] [PATCH v4 02/11] recovery: do not throw an error Date: Thu, 19 Mar 2020 10:56:45 +0300 [thread overview] Message-ID: <20200319075645.GC3227@atlas> (raw) In-Reply-To: <5cf963da3d429fdf99120295c9131fd439c70229.1581500169.git.georgy@tarantool.org> * Georgy Kirichenko <georgy@tarantool.org> [20/02/12 13:09]: > Relaying from C-written wal requires recovery to be a C-compliant. So > get rid of exception from recovery interface. LGTM, but please solicit another review. Let's not cook this any longer, Sergey, it would be really great if you finish this patch and push it. Thanks! > > Part of #980 > --- > src/box/box.cc | 19 ++++++++-- > src/box/recovery.cc | 89 ++++++++++++++++++++++++++------------------- > src/box/recovery.h | 14 +++---- > src/box/relay.cc | 15 ++++---- > 4 files changed, 82 insertions(+), 55 deletions(-) > > diff --git a/src/box/box.cc b/src/box/box.cc > index 68038df18..611100b8b 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -2166,6 +2166,8 @@ local_recovery(const struct tt_uuid *instance_uuid, > recovery = recovery_new(cfg_gets("wal_dir"), > cfg_geti("force_recovery"), > checkpoint_vclock); > + if (recovery == NULL) > + diag_raise(); > > /* > * Make sure we report the actual recovery position > @@ -2183,7 +2185,8 @@ local_recovery(const struct tt_uuid *instance_uuid, > * so we must reflect this in replicaset vclock to > * not attempt to apply these rows twice. > */ > - recovery_scan(recovery, &replicaset.vclock, &gc.vclock); > + if (recovery_scan(recovery, &replicaset.vclock, &gc.vclock) != 0) > + diag_raise(); > say_info("instance vclock %s", vclock_to_string(&replicaset.vclock)); > > if (wal_dir_lock >= 0) { > @@ -2226,7 +2229,8 @@ local_recovery(const struct tt_uuid *instance_uuid, > memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock); > > engine_begin_final_recovery_xc(); > - recover_remaining_wals(recovery, &wal_stream.base, NULL, false); > + if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0) > + diag_raise(); > engine_end_recovery_xc(); > /* > * Leave hot standby mode, if any, only after > @@ -2239,6 +2243,10 @@ local_recovery(const struct tt_uuid *instance_uuid, > cfg_getd("wal_dir_rescan_delay")); > while (true) { > if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock)) { > + /* > + * Let recovery_stop_local override > + * a path_lock error. > + */ > recovery_stop_local(recovery); > diag_raise(); > } > @@ -2246,8 +2254,11 @@ local_recovery(const struct tt_uuid *instance_uuid, > break; > fiber_sleep(0.1); > } > - recovery_stop_local(recovery); > - recover_remaining_wals(recovery, &wal_stream.base, NULL, true); > + if (recovery_stop_local(recovery) != 0) > + diag_raise(); > + if (recover_remaining_wals(recovery, &wal_stream.base, NULL, > + true) != 0) > + diag_raise(); > /* > * Advance replica set vclock to reflect records > * applied in hot standby mode. > diff --git a/src/box/recovery.cc b/src/box/recovery.cc > index a1ac2d967..e4aad1296 100644 > --- a/src/box/recovery.cc > +++ b/src/box/recovery.cc > @@ -87,14 +87,11 @@ recovery_new(const char *wal_dirname, bool force_recovery, > calloc(1, sizeof(*r)); > > if (r == NULL) { > - tnt_raise(OutOfMemory, sizeof(*r), "malloc", > - "struct recovery"); > + diag_set(OutOfMemory, sizeof(*r), "malloc", > + "struct recovery"); > + return NULL; > } > > - auto guard = make_scoped_guard([=]{ > - free(r); > - }); > - > xdir_create(&r->wal_dir, wal_dirname, XLOG, &INSTANCE_UUID, > &xlog_opts_default); > r->wal_dir.force_recovery = force_recovery; > @@ -108,27 +105,31 @@ recovery_new(const char *wal_dirname, bool force_recovery, > * UUID, see replication/cluster.test for > * details. > */ > - xdir_check_xc(&r->wal_dir); > + if (xdir_check(&r->wal_dir) != 0) { > + xdir_destroy(&r->wal_dir); > + free(r); > + return NULL; > + } > > r->watcher = NULL; > rlist_create(&r->on_close_log); > > - guard.is_active = false; > return r; > } > > -void > +int > recovery_scan(struct recovery *r, struct vclock *end_vclock, > struct vclock *gc_vclock) > { > - xdir_scan_xc(&r->wal_dir); > + if (xdir_scan(&r->wal_dir) != 0) > + return -1; > > if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 || > vclock_compare(end_vclock, &r->vclock) < 0) { > /* No xlogs after last checkpoint. */ > vclock_copy(gc_vclock, &r->vclock); > vclock_copy(end_vclock, &r->vclock); > - return; > + return 0; > } > > if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0) > @@ -137,11 +138,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock, > /* Scan the last xlog to find end vclock. */ > struct xlog_cursor cursor; > if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0) > - return; > + return 0; > struct xrow_header row; > while (xlog_cursor_next(&cursor, &row, true) == 0) > vclock_follow_xrow(end_vclock, &row); > xlog_cursor_close(&cursor, false); > + return 0; > } > > static inline void > @@ -156,19 +158,21 @@ recovery_close_log(struct recovery *r) > r->cursor.name); > } > xlog_cursor_close(&r->cursor, false); > - trigger_run_xc(&r->on_close_log, NULL); > + /* Suppress a trigger error if happened. */ > + trigger_run(&r->on_close_log, NULL); > } > > -static void > +static int > recovery_open_log(struct recovery *r, const struct vclock *vclock) > { > - XlogGapError *e; > struct xlog_meta meta = r->cursor.meta; > enum xlog_cursor_state state = r->cursor.state; > > recovery_close_log(r); > > - xdir_open_cursor_xc(&r->wal_dir, vclock_sum(vclock), &r->cursor); > + if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock), > + &r->cursor) != 0) > + return -1; > > if (state == XLOG_CURSOR_NEW && > vclock_compare(vclock, &r->vclock) > 0) { > @@ -201,14 +205,14 @@ out: > */ > if (vclock_compare(&r->vclock, vclock) < 0) > vclock_copy(&r->vclock, vclock); > - return; > + return 0; > > gap_error: > - e = tnt_error(XlogGapError, &r->vclock, vclock); > + diag_set(XlogGapError, &r->vclock, vclock); > if (!r->wal_dir.force_recovery) > - throw e; > + return -1; > /* Ignore missing WALs if force_recovery is set. */ > - e->log(); > + diag_log(); > say_warn("ignoring a gap in LSN"); > goto out; > } > @@ -217,7 +221,6 @@ void > recovery_delete(struct recovery *r) > { > assert(r->watcher == NULL); > - > trigger_destroy(&r->on_close_log); > xdir_destroy(&r->wal_dir); > if (xlog_cursor_is_open(&r->cursor)) { > @@ -237,25 +240,26 @@ recovery_delete(struct recovery *r) > * The reading will be stopped on reaching stop_vclock. > * Use NULL for boundless recover > */ > -static void > +static int > recover_xlog(struct recovery *r, struct xstream *stream, > const struct vclock *stop_vclock) > { > struct xrow_header row; > uint64_t row_count = 0; > - while (xlog_cursor_next_xc(&r->cursor, &row, > - r->wal_dir.force_recovery) == 0) { > + int rc; > + while ((rc = xlog_cursor_next(&r->cursor, &row, > + r->wal_dir.force_recovery)) == 0) { > /* > * Read the next row from xlog file. > * > - * xlog_cursor_next_xc() returns 1 when > + * xlog_cursor_next() returns 1 when > * it can not read more rows. This doesn't mean > * the file is fully read: it's fully read only > * when EOF marker has been read, see i.eof_read > */ > if (stop_vclock != NULL && > r->vclock.signature >= stop_vclock->signature) > - return; > + return 0; > int64_t current_lsn = vclock_get(&r->vclock, row.replica_id); > if (row.lsn <= current_lsn) > continue; /* already applied, skip */ > @@ -282,13 +286,16 @@ recover_xlog(struct recovery *r, struct xstream *stream, > row_count / 1000000.); > } else { > if (!r->wal_dir.force_recovery) > - diag_raise(); > + return -1; > > say_error("skipping row {%u: %lld}", > (unsigned)row.replica_id, (long long)row.lsn); > diag_log(); > } > } > + if (rc < 0) > + return -1; > + return 0; > } > > /** > @@ -302,14 +309,14 @@ recover_xlog(struct recovery *r, struct xstream *stream, > * This function will not close r->current_wal if > * recovery was successful. > */ > -void > +int > recover_remaining_wals(struct recovery *r, struct xstream *stream, > const struct vclock *stop_vclock, bool scan_dir) > { > struct vclock *clock; > > - if (scan_dir) > - xdir_scan_xc(&r->wal_dir); > + if (scan_dir && xdir_scan(&r->wal_dir) != 0) > + return -1; > > if (xlog_cursor_is_open(&r->cursor)) { > /* If there's a WAL open, recover from it first. */ > @@ -343,21 +350,26 @@ recover_remaining_wals(struct recovery *r, struct xstream *stream, > continue; > } > > - recovery_open_log(r, clock); > + if (recovery_open_log(r, clock) != 0) > + return -1; > > say_info("recover from `%s'", r->cursor.name); > > recover_current_wal: > - recover_xlog(r, stream, stop_vclock); > + if (recover_xlog(r, stream, stop_vclock) != 0) > + return -1; > } > > if (xlog_cursor_is_eof(&r->cursor)) > recovery_close_log(r); > > - if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) > - tnt_raise(XlogGapError, &r->vclock, stop_vclock); > + if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) { > + diag_set(XlogGapError, &r->vclock, stop_vclock); > + return -1; > + } > > region_free(&fiber()->gc); > + return 0; > } > > void > @@ -481,7 +493,9 @@ hot_standby_f(va_list ap) > do { > start = vclock_sum(&r->vclock); > > - recover_remaining_wals(r, stream, NULL, scan_dir); > + if (recover_remaining_wals(r, stream, NULL, > + scan_dir) != 0) > + diag_raise(); > > end = vclock_sum(&r->vclock); > /* > @@ -529,7 +543,7 @@ recovery_follow_local(struct recovery *r, struct xstream *stream, > fiber_start(r->watcher, r, stream, wal_dir_rescan_delay); > } > > -void > +int > recovery_stop_local(struct recovery *r) > { > if (r->watcher) { > @@ -537,8 +551,9 @@ recovery_stop_local(struct recovery *r) > r->watcher = NULL; > fiber_cancel(f); > if (fiber_join(f) != 0) > - diag_raise(); > + return -1; > } > + return 0; > } > > /* }}} */ > diff --git a/src/box/recovery.h b/src/box/recovery.h > index 6e68abc0b..145d9199e 100644 > --- a/src/box/recovery.h > +++ b/src/box/recovery.h > @@ -74,7 +74,7 @@ recovery_delete(struct recovery *r); > * @gc_vclock is set to the oldest vclock available in the > * WAL directory. > */ > -void > +int > recovery_scan(struct recovery *r, struct vclock *end_vclock, > struct vclock *gc_vclock); > > @@ -82,16 +82,12 @@ void > recovery_follow_local(struct recovery *r, struct xstream *stream, > const char *name, ev_tstamp wal_dir_rescan_delay); > > -void > +int > recovery_stop_local(struct recovery *r); > > void > recovery_finalize(struct recovery *r); > > -#if defined(__cplusplus) > -} /* extern "C" */ > -#endif /* defined(__cplusplus) */ > - > /** > * Find out if there are new .xlog files since the current > * vclock, and read them all up. > @@ -102,8 +98,12 @@ recovery_finalize(struct recovery *r); > * This function will not close r->current_wal if > * recovery was successful. > */ > -void > +int > recover_remaining_wals(struct recovery *r, struct xstream *stream, > const struct vclock *stop_vclock, bool scan_dir); > > +#if defined(__cplusplus) > +} /* extern "C" */ > +#endif /* defined(__cplusplus) */ > + > #endif /* TARANTOOL_RECOVERY_H_INCLUDED */ > diff --git a/src/box/relay.cc b/src/box/relay.cc > index b89632273..d5a1c9c68 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -334,8 +334,9 @@ relay_final_join_f(va_list ap) > > /* Send all WALs until stop_vclock */ > assert(relay->stream.write != NULL); > - recover_remaining_wals(relay->r, &relay->stream, > - &relay->stop_vclock, true); > + if (recover_remaining_wals(relay->r, &relay->stream, > + &relay->stop_vclock, true) != 0) > + diag_raise(); > assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0); > return 0; > } > @@ -491,11 +492,9 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) > */ > return; > } > - try { > - recover_remaining_wals(relay->r, &relay->stream, NULL, > - (events & WAL_EVENT_ROTATE) != 0); > - } catch (Exception *e) { > - relay_set_error(relay, e); > + if (recover_remaining_wals(relay->r, &relay->stream, NULL, > + (events & WAL_EVENT_ROTATE) != 0) != 0) { > + relay_set_error(relay, diag_last_error(diag_get())); > fiber_cancel(fiber()); > } > } > @@ -702,6 +701,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, > vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); > relay->r = recovery_new(cfg_gets("wal_dir"), false, > replica_clock); > + if (relay->r == NULL) > + diag_raise(); > vclock_copy(&relay->tx.vclock, replica_clock); > relay->version_id = replica_version_id; > > -- > 2.25.0 -- Konstantin Osipov, Moscow, Russia
next prev parent reply other threads:[~2020-03-19 7:56 UTC|newest] Thread overview: 16+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-02-12 9:39 [Tarantool-patches] [PATCH v4 00/11] Replication from memory Georgy Kirichenko 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 01/11] recovery: do not call recovery_stop_local inside recovery_delete Georgy Kirichenko 2020-03-19 7:55 ` Konstantin Osipov 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 02/11] recovery: do not throw an error Georgy Kirichenko 2020-03-19 7:56 ` Konstantin Osipov [this message] 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 03/11] coio: do not allow parallel usage of coio Georgy Kirichenko 2020-03-19 18:09 ` Konstantin Osipov 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 04/11] coio: do not throw an error, minor refactoring Georgy Kirichenko 2020-03-23 6:59 ` Konstantin Osipov 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 05/11] xstream: get rid of an exception Georgy Kirichenko 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 06/11] wal: extract log write batch into a separate routine Georgy Kirichenko 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 07/11] wal: matrix clock structure Georgy Kirichenko 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 08/11] wal: track relay vclock and collect logs in wal thread Georgy Kirichenko 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 09/11] wal: xrow memory buffer and cursor Georgy Kirichenko 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 10/11] wal: use a xrow buffer object for entry encoding Georgy Kirichenko 2020-02-12 9:39 ` [Tarantool-patches] [PATCH v4 11/11] replication: use wal memory buffer to fetch rows Georgy Kirichenko
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20200319075645.GC3227@atlas \ --to=kostja.osipov@gmail.com \ --cc=georgy@tarantool.org \ --cc=sergepetrenko@tarantool.org \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v4 02/11] recovery: do not throw an error' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox