From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org Subject: [Tarantool-patches] [PATCH] recovery: make it yield when positioning in a WAL Date: Mon, 26 Apr 2021 19:59:54 +0300 [thread overview] Message-ID: <20210426165954.46474-1-sergepetrenko@tarantool.org> (raw) We had various places in box.cc and relay.cc which counted processed rows and yielded every now and then. These yields didn't cover cases, when recovery has to position inside a long WAL file: For example, when tarantool exits without leaving an empty WAL file which'll be used to recover instance vclock on restart. In this case the instance freezes while processing the last availabe WAL in order to recover the vclock. Another issue is with replication. If a replica connects and needs data from the end of a really long WAL, recovery will read up to the needed position without yields, making relay disconnect by timeout. In order to fix the issue, make recovery decide when a yield should happen. Introduce a new callback: schedule_yield, which is called by recovery once it processes (no matter how, either simply skips or calls xstream_write) enough rows (WAL_ROWS_PER_YIELD). schedule_yield either yields right away, in case of relay, or saves the yield for later, in case of local recovery, because it might be in the middle of a transaction. The only place with explicit row counting and manual yielding is now in relay_initial_join, since its row sources are engines rather than recovery with its WAL files. Closes #5979 --- https://github.com/tarantool/tarantool/tree/sp/gh-5979-recovery-yield https://github.com/tarantool/tarantool/issues/5979 src/box/box.cc | 32 +++++++++++++++----------------- src/box/recovery.cc | 26 +++++++++++++++++--------- src/box/recovery.h | 15 ++++++++++++++- src/box/relay.cc | 28 ++++++++++++++++++---------- 4 files changed, 64 insertions(+), 37 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 59925962d..69a8f87eb 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -335,8 +335,6 @@ struct wal_stream { * matches LSN of a first global row. */ bool has_global_row; - /** How many rows have been recovered so far. */ - size_t rows; }; /** @@ -570,12 +568,6 @@ end_diag_request: static void wal_stream_try_yield(struct wal_stream *stream) { - /* - * Save the yield. Otherwise it would happen only on rows which - * are a multiple of WAL_ROWS_PER_YIELD and are last in their - * transaction, which is probably a very rare coincidence. - */ - stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0); if (wal_stream_has_tx(stream) || !stream->has_yield) return; stream->has_yield = false; @@ -587,11 +579,6 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row) { struct wal_stream *stream = container_of(base, struct wal_stream, base); - /* - * Account all rows, even non-DML, and even leading to an error. Because - * still need to yield sometimes. - */ - ++stream->rows; if (iproto_type_is_synchro_request(row->type)) { if (wal_stream_apply_synchro_row(stream, row) != 0) goto end_error; @@ -618,7 +605,6 @@ wal_stream_create(struct wal_stream *ctx) ctx->first_row_lsn = 0; ctx->has_yield = false; ctx->has_global_row = false; - ctx->rows = 0; } /* {{{ configuration bindings */ @@ -3101,6 +3087,19 @@ bootstrap(const struct tt_uuid *instance_uuid, } } +struct wal_stream wal_stream; + +/** + * Plan a yield in recovery stream. Wal stream will execute it as soon as it's + * ready. + */ +static void +wal_stream_schedule_yield(void) +{ + wal_stream.has_yield = true; + wal_stream_try_yield(&wal_stream); +} + /** * Recover the instance from the local directory. * Enter hot standby if the directory is locked. @@ -3124,7 +3123,6 @@ local_recovery(const struct tt_uuid *instance_uuid, say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID)); - struct wal_stream wal_stream; wal_stream_create(&wal_stream); auto stream_guard = make_scoped_guard([&]{ wal_stream_abort(&wal_stream); @@ -3132,8 +3130,8 @@ local_recovery(const struct tt_uuid *instance_uuid, struct recovery *recovery; bool is_force_recovery = cfg_geti("force_recovery"); - recovery = recovery_new(wal_dir(), is_force_recovery, - checkpoint_vclock); + recovery = recovery_new(wal_dir(), is_force_recovery, checkpoint_vclock, + wal_stream_schedule_yield); /* * Make sure we report the actual recovery position diff --git a/src/box/recovery.cc b/src/box/recovery.cc index cd33e7635..5351d8524 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -81,7 +81,7 @@ */ struct recovery * recovery_new(const char *wal_dirname, bool force_recovery, - const struct vclock *vclock) + const struct vclock *vclock, schedule_yield_f schedule_yield) { struct recovery *r = (struct recovery *) calloc(1, sizeof(*r)); @@ -113,6 +113,9 @@ recovery_new(const char *wal_dirname, bool force_recovery, r->watcher = NULL; rlist_create(&r->on_close_log); + r->row_count = 0; + r->schedule_yield = schedule_yield; + guard.is_active = false; return r; } @@ -139,8 +142,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock, if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0) return; struct xrow_header row; - while (xlog_cursor_next(&cursor, &row, true) == 0) + while (xlog_cursor_next(&cursor, &row, true) == 0) { vclock_follow_xrow(end_vclock, &row); + if (++r->row_count % WAL_ROWS_PER_YIELD == 0) { + r->schedule_yield(); + } + } xlog_cursor_close(&cursor, false); } @@ -241,10 +248,16 @@ static void recover_xlog(struct recovery *r, struct xstream *stream, const struct vclock *stop_vclock) { + /* Imitate old behaviour. Rows are counted separately for each xlog. */ + r->row_count = 0; struct xrow_header row; - uint64_t row_count = 0; while (xlog_cursor_next_xc(&r->cursor, &row, r->wal_dir.force_recovery) == 0) { + if (++r->row_count % WAL_ROWS_PER_YIELD == 0) { + r->schedule_yield(); + } + if (r->row_count % 100000 == 0) + say_info("%.1fM rows processed", r->row_count / 1000000.); /* * Read the next row from xlog file. * @@ -273,12 +286,7 @@ recover_xlog(struct recovery *r, struct xstream *stream, * failed row anyway. */ vclock_follow_xrow(&r->vclock, &row); - if (xstream_write(stream, &row) == 0) { - ++row_count; - if (row_count % 100000 == 0) - say_info("%.1fM rows processed", - row_count / 1000000.); - } else { + if (xstream_write(stream, &row) != 0) { if (!r->wal_dir.force_recovery) diag_raise(); diff --git a/src/box/recovery.h b/src/box/recovery.h index c8ccaa553..4212fd192 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -43,6 +43,12 @@ extern "C" { struct xrow_header; struct xstream; +/** + * A type for a callback invoked by recovery after some batch of rows is + * processed. Is used mostly to unblock the event loop every now and then. + */ +typedef void (*schedule_yield_f)(void); + struct recovery { struct vclock vclock; /** The WAL cursor we're currently reading/writing from/to. */ @@ -56,11 +62,18 @@ struct recovery { struct fiber *watcher; /** List of triggers invoked when the current WAL is closed. */ struct rlist on_close_log; + uint64_t row_count; + /** + * A callback recovery calls eveery WAL_ROWS_PER_YIELD processed rows. + * This includes both applied rows and the rows which were skipped to + * position recovery at the right position in xlog. + */ + schedule_yield_f schedule_yield; }; struct recovery * recovery_new(const char *wal_dirname, bool force_recovery, - const struct vclock *vclock); + const struct vclock *vclock, schedule_yield_f schedule_yield); void recovery_delete(struct recovery *r); diff --git a/src/box/relay.cc b/src/box/relay.cc index ff43c2fc7..c7002bd46 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -418,6 +418,12 @@ relay_final_join_f(va_list ap) return 0; } +static void +relay_yield(void) +{ + fiber_sleep(0); +} + void relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, struct vclock *stop_vclock) @@ -438,7 +444,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, * always be valid. */ vclock_copy(&relay->recv_vclock, start_vclock); - relay->r = recovery_new(wal_dir(), false, start_vclock); + relay->r = recovery_new(wal_dir(), false, start_vclock, relay_yield); vclock_copy(&relay->stop_vclock, stop_vclock); int rc = cord_costart(&relay->cord, "final_join", @@ -904,7 +910,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, * always be valid. */ vclock_copy(&relay->recv_vclock, replica_clock); - relay->r = recovery_new(wal_dir(), false, replica_clock); + relay->r = recovery_new(wal_dir(), false, replica_clock, relay_yield); vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; @@ -928,13 +934,6 @@ relay_send(struct relay *relay, struct xrow_header *packet) coio_write_xrow(&relay->io, packet); fiber_gc(); - /* - * It may happen that the socket is always ready for write, so yield - * explicitly every now and then to not block the event loop. - */ - if (++relay->row_count % WAL_ROWS_PER_YIELD == 0) - fiber_sleep(0); - struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); if (inj != NULL && inj->dparam > 0) fiber_sleep(inj->dparam); @@ -944,6 +943,15 @@ static void relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) { struct relay *relay = container_of(stream, struct relay, stream); + /* + * It may happen that the socket is always ready for write, so yield + * explicitly every now and then to not block the event loop. + * This is not part of relay_yield(), because here the engines are the + * source of rows, and not recovery. + */ + if (++relay->row_count % WAL_ROWS_PER_YIELD == 0) + fiber_sleep(0); + /* * Ignore replica local requests as we don't need to promote * vclock while sending a snapshot. @@ -982,7 +990,7 @@ relay_restart_recovery(struct relay *relay) struct vclock restart_vclock; vclock_copy(&restart_vclock, &relay->recv_vclock); vclock_reset(&restart_vclock, 0, vclock_get(&relay->r->vclock, 0)); - struct recovery *r = recovery_new(wal_dir(), false, &restart_vclock); + struct recovery *r = recovery_new(wal_dir(), false, &restart_vclock, relay_yield); rlist_swap(&relay->r->on_close_log, &r->on_close_log); recovery_delete(relay->r); relay->r = r; -- 2.24.3 (Apple Git-128)
next reply other threads:[~2021-04-26 17:00 UTC|newest] Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-04-26 16:59 Serge Petrenko via Tarantool-patches [this message] 2021-04-26 21:20 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-28 15:34 ` Serge Petrenko via Tarantool-patches 2021-04-28 20:50 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-29 8:55 ` Serge Petrenko via Tarantool-patches 2021-04-29 20:03 ` Vladislav Shpilevoy via Tarantool-patches 2021-05-12 11:30 ` Serge Petrenko via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20210426165954.46474-1-sergepetrenko@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=gorcunov@gmail.com \ --cc=sergepetrenko@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH] recovery: make it yield when positioning in a WAL' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox