[Tarantool-patches] [PATCH] recovery: make it yield when positioning in a WAL

Serge Petrenko sergepetrenko at tarantool.org
Mon Apr 26 19:59:54 MSK 2021


We had various places in box.cc and relay.cc which counted processed
rows and yielded every now and then. These yields didn't cover cases,
when recovery has to position inside a long WAL file:

For example, when tarantool exits without leaving an empty WAL file
which'll be used to recover instance vclock on restart. In this case
the instance freezes while processing the last availabe WAL in order
to recover the vclock.

Another issue is with replication. If a replica connects and needs data
from the end of a really long WAL, recovery will read up to the needed
position without yields, making relay disconnect by timeout.

In order to fix the issue, make recovery decide when a yield should
happen. Introduce a new callback: schedule_yield, which is called by
recovery once it processes (no matter how, either simply skips or calls
xstream_write) enough rows (WAL_ROWS_PER_YIELD).

schedule_yield either yields right away, in case of relay, or saves the
yield for later, in case of local recovery, because it might be in the
middle of a transaction.

The only place with explicit row counting and manual yielding is now in
relay_initial_join, since its row sources are engines rather than recovery
with its WAL files.

Closes #5979
---
https://github.com/tarantool/tarantool/tree/sp/gh-5979-recovery-yield
https://github.com/tarantool/tarantool/issues/5979

 src/box/box.cc      | 32 +++++++++++++++-----------------
 src/box/recovery.cc | 26 +++++++++++++++++---------
 src/box/recovery.h  | 15 ++++++++++++++-
 src/box/relay.cc    | 28 ++++++++++++++++++----------
 4 files changed, 64 insertions(+), 37 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 59925962d..69a8f87eb 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -335,8 +335,6 @@ struct wal_stream {
 	 * matches LSN of a first global row.
 	 */
 	bool has_global_row;
-	/** How many rows have been recovered so far. */
-	size_t rows;
 };
 
 /**
@@ -570,12 +568,6 @@ end_diag_request:
 static void
 wal_stream_try_yield(struct wal_stream *stream)
 {
-	/*
-	 * Save the yield. Otherwise it would happen only on rows which
-	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
-	 * transaction, which is probably a very rare coincidence.
-	 */
-	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
 	if (wal_stream_has_tx(stream) || !stream->has_yield)
 		return;
 	stream->has_yield = false;
@@ -587,11 +579,6 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
 {
 	struct wal_stream *stream =
 		container_of(base, struct wal_stream, base);
-	/*
-	 * Account all rows, even non-DML, and even leading to an error. Because
-	 * still need to yield sometimes.
-	 */
-	++stream->rows;
 	if (iproto_type_is_synchro_request(row->type)) {
 		if (wal_stream_apply_synchro_row(stream, row) != 0)
 			goto end_error;
@@ -618,7 +605,6 @@ wal_stream_create(struct wal_stream *ctx)
 	ctx->first_row_lsn = 0;
 	ctx->has_yield = false;
 	ctx->has_global_row = false;
-	ctx->rows = 0;
 }
 
 /* {{{ configuration bindings */
@@ -3101,6 +3087,19 @@ bootstrap(const struct tt_uuid *instance_uuid,
 	}
 }
 
+struct wal_stream wal_stream;
+
+/**
+ * Plan a yield in recovery stream. Wal stream will execute it as soon as it's
+ * ready.
+ */
+static void
+wal_stream_schedule_yield(void)
+{
+	wal_stream.has_yield = true;
+	wal_stream_try_yield(&wal_stream);
+}
+
 /**
  * Recover the instance from the local directory.
  * Enter hot standby if the directory is locked.
@@ -3124,7 +3123,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
 
-	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
 	auto stream_guard = make_scoped_guard([&]{
 		wal_stream_abort(&wal_stream);
@@ -3132,8 +3130,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct recovery *recovery;
 	bool is_force_recovery = cfg_geti("force_recovery");
-	recovery = recovery_new(wal_dir(), is_force_recovery,
-				checkpoint_vclock);
+	recovery = recovery_new(wal_dir(), is_force_recovery, checkpoint_vclock,
+				wal_stream_schedule_yield);
 
 	/*
 	 * Make sure we report the actual recovery position
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index cd33e7635..5351d8524 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -81,7 +81,7 @@
  */
 struct recovery *
 recovery_new(const char *wal_dirname, bool force_recovery,
-	     const struct vclock *vclock)
+	     const struct vclock *vclock, schedule_yield_f schedule_yield)
 {
 	struct recovery *r = (struct recovery *)
 			calloc(1, sizeof(*r));
@@ -113,6 +113,9 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 	r->watcher = NULL;
 	rlist_create(&r->on_close_log);
 
+	r->row_count = 0;
+	r->schedule_yield = schedule_yield;
+
 	guard.is_active = false;
 	return r;
 }
@@ -139,8 +142,12 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock,
 	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
 		return;
 	struct xrow_header row;
-	while (xlog_cursor_next(&cursor, &row, true) == 0)
+	while (xlog_cursor_next(&cursor, &row, true) == 0) {
 		vclock_follow_xrow(end_vclock, &row);
+		if (++r->row_count % WAL_ROWS_PER_YIELD == 0) {
+			r->schedule_yield();
+		}
+	}
 	xlog_cursor_close(&cursor, false);
 }
 
@@ -241,10 +248,16 @@ static void
 recover_xlog(struct recovery *r, struct xstream *stream,
 	     const struct vclock *stop_vclock)
 {
+	/* Imitate old behaviour. Rows are counted separately for each xlog. */
+	r->row_count = 0;
 	struct xrow_header row;
-	uint64_t row_count = 0;
 	while (xlog_cursor_next_xc(&r->cursor, &row,
 				   r->wal_dir.force_recovery) == 0) {
+		if (++r->row_count % WAL_ROWS_PER_YIELD == 0) {
+			r->schedule_yield();
+		}
+		if (r->row_count % 100000 == 0)
+			say_info("%.1fM rows processed", r->row_count / 1000000.);
 		/*
 		 * Read the next row from xlog file.
 		 *
@@ -273,12 +286,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 		 * failed row anyway.
 		 */
 		vclock_follow_xrow(&r->vclock, &row);
-		if (xstream_write(stream, &row) == 0) {
-			++row_count;
-			if (row_count % 100000 == 0)
-				say_info("%.1fM rows processed",
-					 row_count / 1000000.);
-		} else {
+		if (xstream_write(stream, &row) != 0) {
 			if (!r->wal_dir.force_recovery)
 				diag_raise();
 
diff --git a/src/box/recovery.h b/src/box/recovery.h
index c8ccaa553..4212fd192 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -43,6 +43,12 @@ extern "C" {
 struct xrow_header;
 struct xstream;
 
+/**
+ * A type for a callback invoked by recovery after some batch of rows is
+ * processed. Is used mostly to unblock the event loop every now and then.
+ */
+typedef void (*schedule_yield_f)(void);
+
 struct recovery {
 	struct vclock vclock;
 	/** The WAL cursor we're currently reading/writing from/to. */
@@ -56,11 +62,18 @@ struct recovery {
 	struct fiber *watcher;
 	/** List of triggers invoked when the current WAL is closed. */
 	struct rlist on_close_log;
+	uint64_t row_count;
+	/**
+	 * A callback recovery calls eveery WAL_ROWS_PER_YIELD processed rows.
+	 * This includes both applied rows and the rows which were skipped to
+	 * position recovery at the right position in xlog.
+	 */
+	schedule_yield_f schedule_yield;
 };
 
 struct recovery *
 recovery_new(const char *wal_dirname, bool force_recovery,
-	     const struct vclock *vclock);
+	     const struct vclock *vclock, schedule_yield_f schedule_yield);
 
 void
 recovery_delete(struct recovery *r);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index ff43c2fc7..c7002bd46 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -418,6 +418,12 @@ relay_final_join_f(va_list ap)
 	return 0;
 }
 
+static void
+relay_yield(void)
+{
+	fiber_sleep(0);
+}
+
 void
 relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 		 struct vclock *stop_vclock)
@@ -438,7 +444,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 	 * always be valid.
 	 */
 	vclock_copy(&relay->recv_vclock, start_vclock);
-	relay->r = recovery_new(wal_dir(), false, start_vclock);
+	relay->r = recovery_new(wal_dir(), false, start_vclock, relay_yield);
 	vclock_copy(&relay->stop_vclock, stop_vclock);
 
 	int rc = cord_costart(&relay->cord, "final_join",
@@ -904,7 +910,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	 * always be valid.
 	 */
 	vclock_copy(&relay->recv_vclock, replica_clock);
-	relay->r = recovery_new(wal_dir(), false, replica_clock);
+	relay->r = recovery_new(wal_dir(), false, replica_clock, relay_yield);
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
@@ -928,13 +934,6 @@ relay_send(struct relay *relay, struct xrow_header *packet)
 	coio_write_xrow(&relay->io, packet);
 	fiber_gc();
 
-	/*
-	 * It may happen that the socket is always ready for write, so yield
-	 * explicitly every now and then to not block the event loop.
-	 */
-	if (++relay->row_count % WAL_ROWS_PER_YIELD == 0)
-		fiber_sleep(0);
-
 	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
 	if (inj != NULL && inj->dparam > 0)
 		fiber_sleep(inj->dparam);
@@ -944,6 +943,15 @@ static void
 relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
+	/*
+	 * It may happen that the socket is always ready for write, so yield
+	 * explicitly every now and then to not block the event loop.
+	 * This is not part of relay_yield(), because here the engines are the
+	 * source of rows, and not recovery.
+	 */
+	if (++relay->row_count % WAL_ROWS_PER_YIELD == 0)
+		fiber_sleep(0);
+
 	/*
 	 * Ignore replica local requests as we don't need to promote
 	 * vclock while sending a snapshot.
@@ -982,7 +990,7 @@ relay_restart_recovery(struct relay *relay)
 	struct vclock restart_vclock;
 	vclock_copy(&restart_vclock, &relay->recv_vclock);
 	vclock_reset(&restart_vclock, 0, vclock_get(&relay->r->vclock, 0));
-	struct recovery *r = recovery_new(wal_dir(), false, &restart_vclock);
+	struct recovery *r = recovery_new(wal_dir(), false, &restart_vclock, relay_yield);
 	rlist_swap(&relay->r->on_close_log, &r->on_close_log);
 	recovery_delete(relay->r);
 	relay->r = r;
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list