Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: v.shpilevoy@tarantool.org, gorcunov@gmail.com
Cc: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [PATCH] recovery: make it yield when positioning in a WAL
Date: Mon, 26 Apr 2021 19:59:54 +0300
Message-ID: <20210426165954.46474-1-sergepetrenko@tarantool.org> (raw)

We had various places in box.cc and relay.cc which counted processed
rows and yielded every now and then. These yields didn't cover cases,
when recovery has to position inside a long WAL file:

For example, when tarantool exits without leaving an empty WAL file
which'll be used to recover instance vclock on restart. In this case
the instance freezes while processing the last availabe WAL in order
to recover the vclock.

Another issue is with replication. If a replica connects and needs data
from the end of a really long WAL, recovery will read up to the needed
position without yields, making relay disconnect by timeout.

In order to fix the issue, make recovery decide when a yield should
happen. Introduce a new callback: schedule_yield, which is called by
recovery once it processes (no matter how, either simply skips or calls
xstream_write) enough rows (WAL_ROWS_PER_YIELD).

schedule_yield either yields right away, in case of relay, or saves the
yield for later, in case of local recovery, because it might be in the
middle of a transaction.

The only place with explicit row counting and manual yielding is now in
relay_initial_join, since its row sources are engines rather than recovery
with its WAL files.

Closes #5979
---
https://github.com/tarantool/tarantool/tree/sp/gh-5979-recovery-yield
https://github.com/tarantool/tarantool/issues/5979

 src/box/box.cc      | 32 +++++++++++++++-----------------
 src/box/recovery.cc | 26 +++++++++++++++++---------
 src/box/recovery.h  | 15 ++++++++++++++-
 src/box/relay.cc    | 28 ++++++++++++++++++----------
 4 files changed, 64 insertions(+), 37 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 59925962d..69a8f87eb 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -335,8 +335,6 @@ struct wal_stream {
 	 * matches LSN of a first global row.
 	 */
 	bool has_global_row;
-	/** How many rows have been recovered so far. */
-	size_t rows;
 };
 
 /**
@@ -570,12 +568,6 @@ end_diag_request:
 static void
 wal_stream_try_yield(struct wal_stream *stream)
 {
-	/*
-	 * Save the yield. Otherwise it would happen only on rows which
-	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
-	 * transaction, which is probably a very rare coincidence.
-	 */
-	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
 	if (wal_stream_has_tx(stream) || !stream->has_yield)
 		return;
 	stream->has_yield = false;
@@ -587,11 +579,6 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
 {
 	struct wal_stream *stream =
 		container_of(base, struct wal_stream, base);
-	/*
-	 * Account all rows, even non-DML, and even leading to an error. Because
-	 * still need to yield sometimes.
-	 */
-	++stream->rows;
 	if (iproto_type_is_synchro_request(row->type)) {
 		if (wal_stream_apply_synchro_row(stream, row) != 0)
 			goto end_error;
@@ -618,7 +605,6 @@ wal_stream_create(struct wal_stream *ctx)
 	ctx->first_row_lsn = 0;
 	ctx->has_yield = false;
 	ctx->has_global_row = false;
-	ctx->rows = 0;
 }
 
 /* {{{ configuration bindings */
@@ -3101,6 +3087,19 @@ bootstrap(const struct tt_uuid *instance_uuid,
 	}
 }
 
+struct wal_stream wal_stream;
+
+/**
+ * Plan a yield in recovery stream. Wal stream will execute it as soon as it's
+ * ready.
+ */
+static void
+wal_stream_schedule_yield(void)
+{
+	wal_stream.has_yield = true;
+	wal_stream_try_yield(&wal_stream);
+}
+
 /**
  * Recover the instance from the local directory.
  * Enter hot standby if the directory is locked.
@@ -3124,7 +3123,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
 
-	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
 	auto stream_guard = make_scoped_guard([&]{
 		wal_stream_abort(&wal_stream);
@@ -3132,8 +3130,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct recovery *recovery;
 	bool is_force_recovery = cfg_geti("force_recovery");
-	recovery = recovery_new(wal_dir(), is_force_recovery,
-				checkpoint_vclock);
+	recovery = recovery_new(wal_dir(), is_force_recovery, checkpoint_vclock,
+				wal_stream_schedule_yield);
 
 	/*
 	 * Make sure we report the actual recovery position
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index cd33e7635..5351d8524 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -81,7 +81,7 @@
  */
 struct recovery *
 recovery_new(const char *wal_dirname, bool force_recovery,
-	     const struct vclock *vclock)
+	     const struct vclock *vclock, schedule_yield_f schedule_yield)
 {
 	struct recovery *r = (struct recovery *)
 			calloc(1, sizeof(*r));
@@ -113,6 +113,9 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 	r->watcher = NULL;
 	rlist_create(&r->on_close_log);
 
+	r->row_count = 0;
+	r->schedule_yield = schedule_yield;
+
 	guard.is_active = false;
 	return r;
 }
@@ -139,8 +142,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock,
 	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
 		return;
 	struct xrow_header row;
-	while (xlog_cursor_next(&cursor, &row, true) == 0)
+	while (xlog_cursor_next(&cursor, &row, true) == 0) {
 		vclock_follow_xrow(end_vclock, &row);
+		if (++r->row_count % WAL_ROWS_PER_YIELD == 0) {
+			r->schedule_yield();
+		}
+	}
 	xlog_cursor_close(&cursor, false);
 }
 
@@ -241,10 +248,16 @@ static void
 recover_xlog(struct recovery *r, struct xstream *stream,
 	     const struct vclock *stop_vclock)
 {
+	/* Imitate old behaviour. Rows are counted separately for each xlog. */
+	r->row_count = 0;
 	struct xrow_header row;
-	uint64_t row_count = 0;
 	while (xlog_cursor_next_xc(&r->cursor, &row,
 				   r->wal_dir.force_recovery) == 0) {
+		if (++r->row_count % WAL_ROWS_PER_YIELD == 0) {
+			r->schedule_yield();
+		}
+		if (r->row_count % 100000 == 0)
+			say_info("%.1fM rows processed", r->row_count / 1000000.);
 		/*
 		 * Read the next row from xlog file.
 		 *
@@ -273,12 +286,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 		 * failed row anyway.
 		 */
 		vclock_follow_xrow(&r->vclock, &row);
-		if (xstream_write(stream, &row) == 0) {
-			++row_count;
-			if (row_count % 100000 == 0)
-				say_info("%.1fM rows processed",
-					 row_count / 1000000.);
-		} else {
+		if (xstream_write(stream, &row) != 0) {
 			if (!r->wal_dir.force_recovery)
 				diag_raise();
 
diff --git a/src/box/recovery.h b/src/box/recovery.h
index c8ccaa553..4212fd192 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -43,6 +43,12 @@ extern "C" {
 struct xrow_header;
 struct xstream;
 
+/**
+ * A type for a callback invoked by recovery after some batch of rows is
+ * processed. Is used mostly to unblock the event loop every now and then.
+ */
+typedef void (*schedule_yield_f)(void);
+
 struct recovery {
 	struct vclock vclock;
 	/** The WAL cursor we're currently reading/writing from/to. */
@@ -56,11 +62,18 @@ struct recovery {
 	struct fiber *watcher;
 	/** List of triggers invoked when the current WAL is closed. */
 	struct rlist on_close_log;
+	uint64_t row_count;
+	/**
+	 * A callback recovery calls eveery WAL_ROWS_PER_YIELD processed rows.
+	 * This includes both applied rows and the rows which were skipped to
+	 * position recovery at the right position in xlog.
+	 */
+	schedule_yield_f schedule_yield;
 };
 
 struct recovery *
 recovery_new(const char *wal_dirname, bool force_recovery,
-	     const struct vclock *vclock);
+	     const struct vclock *vclock, schedule_yield_f schedule_yield);
 
 void
 recovery_delete(struct recovery *r);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index ff43c2fc7..c7002bd46 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -418,6 +418,12 @@ relay_final_join_f(va_list ap)
 	return 0;
 }
 
+static void
+relay_yield(void)
+{
+	fiber_sleep(0);
+}
+
 void
 relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 		 struct vclock *stop_vclock)
@@ -438,7 +444,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 	 * always be valid.
 	 */
 	vclock_copy(&relay->recv_vclock, start_vclock);
-	relay->r = recovery_new(wal_dir(), false, start_vclock);
+	relay->r = recovery_new(wal_dir(), false, start_vclock, relay_yield);
 	vclock_copy(&relay->stop_vclock, stop_vclock);
 
 	int rc = cord_costart(&relay->cord, "final_join",
@@ -904,7 +910,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	 * always be valid.
 	 */
 	vclock_copy(&relay->recv_vclock, replica_clock);
-	relay->r = recovery_new(wal_dir(), false, replica_clock);
+	relay->r = recovery_new(wal_dir(), false, replica_clock, relay_yield);
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
@@ -928,13 +934,6 @@ relay_send(struct relay *relay, struct xrow_header *packet)
 	coio_write_xrow(&relay->io, packet);
 	fiber_gc();
 
-	/*
-	 * It may happen that the socket is always ready for write, so yield
-	 * explicitly every now and then to not block the event loop.
-	 */
-	if (++relay->row_count % WAL_ROWS_PER_YIELD == 0)
-		fiber_sleep(0);
-
 	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
 	if (inj != NULL && inj->dparam > 0)
 		fiber_sleep(inj->dparam);
@@ -944,6 +943,15 @@ static void
 relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
+	/*
+	 * It may happen that the socket is always ready for write, so yield
+	 * explicitly every now and then to not block the event loop.
+	 * This is not part of relay_yield(), because here the engines are the
+	 * source of rows, and not recovery.
+	 */
+	if (++relay->row_count % WAL_ROWS_PER_YIELD == 0)
+		fiber_sleep(0);
+
 	/*
 	 * Ignore replica local requests as we don't need to promote
 	 * vclock while sending a snapshot.
@@ -982,7 +990,7 @@ relay_restart_recovery(struct relay *relay)
 	struct vclock restart_vclock;
 	vclock_copy(&restart_vclock, &relay->recv_vclock);
 	vclock_reset(&restart_vclock, 0, vclock_get(&relay->r->vclock, 0));
-	struct recovery *r = recovery_new(wal_dir(), false, &restart_vclock);
+	struct recovery *r = recovery_new(wal_dir(), false, &restart_vclock, relay_yield);
 	rlist_swap(&relay->r->on_close_log, &r->on_close_log);
 	recovery_delete(relay->r);
 	relay->r = r;
-- 
2.24.3 (Apple Git-128)


             reply	other threads:[~2021-04-26 17:00 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-04-26 16:59 Serge Petrenko via Tarantool-patches [this message]
2021-04-26 21:20 ` Vladislav Shpilevoy via Tarantool-patches
2021-04-28 15:34   ` Serge Petrenko via Tarantool-patches
2021-04-28 20:50     ` Vladislav Shpilevoy via Tarantool-patches
2021-04-29  8:55       ` Serge Petrenko via Tarantool-patches
2021-04-29 20:03         ` Vladislav Shpilevoy via Tarantool-patches
2021-05-12 11:30           ` Serge Petrenko via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210426165954.46474-1-sergepetrenko@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=sergepetrenko@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Tarantool development patches archive

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://lists.tarantool.org/tarantool-patches/0 tarantool-patches/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 tarantool-patches tarantool-patches/ https://lists.tarantool.org/tarantool-patches \
		tarantool-patches@dev.tarantool.org.
	public-inbox-index tarantool-patches

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git