[Tarantool-patches] [PATCH v2] recovery: make it yield when positioning in a WAL
    Serge Petrenko 
    sergepetrenko at tarantool.org
       
    Wed May 12 14:29:28 MSK 2021
    
    
  
We had various places in box.cc and relay.cc which counted processed
rows and yielded every now and then. These yields didn't cover cases,
when recovery has to position inside a long WAL file:
For example, when tarantool exits without leaving an empty WAL file
which'll be used to recover instance vclock on restart. In this case
the instance freezes while processing the last availabe WAL in order
to recover the vclock.
Another issue is with replication. If a replica connects and needs data
from the end of a really long WAL, recovery will read up to the needed
position without yields, making relay disconnect by timeout.
In order to fix the issue, make recovery decide when a yield should
happen. Once recovery decides so, it calls a xstream callback,
schedule_yield. Currently schedule_yield is fired once recovery
processes (either skips or writes) WAL_ROWS_PER_YIELD rows.
schedule_yield either yields right away, in case of relay, or saves the
yield for later, in case of local recovery, because it might be in the
middle of a transaction.
Closes #5979
---
https://github.com/tarantool/tarantool/tree/sp/gh-5979-recovery-yield
https://github.com/tarantool/tarantool/issues/5979
Changes in v2:
  - move schedule_yield to xstream
 .../unreleased/gh-5979-recovery-ligs.md       | 11 ++++++
 src/box/box.cc                                | 37 +++++++++----------
 src/box/memtx_engine.c                        |  1 +
 src/box/recovery.cc                           | 23 +++++++-----
 src/box/recovery.h                            |  2 +-
 src/box/relay.cc                              | 16 ++++----
 src/box/vinyl.c                               |  1 +
 src/box/xstream.h                             | 26 ++++++++++++-
 8 files changed, 79 insertions(+), 38 deletions(-)
 create mode 100644 changelogs/unreleased/gh-5979-recovery-ligs.md
diff --git a/changelogs/unreleased/gh-5979-recovery-ligs.md b/changelogs/unreleased/gh-5979-recovery-ligs.md
new file mode 100644
index 000000000..86abfd66a
--- /dev/null
+++ b/changelogs/unreleased/gh-5979-recovery-ligs.md
@@ -0,0 +1,11 @@
+# bugfix/core
+
+* Now tarantool yields when scanning `.xlog` files for the latest applied vclock
+  and when finding the right place in `.xlog`s to start recovering. This means
+  that the instance is responsive right after `box.cfg` call even when an empty
+  `.xlog` was not created on previous exit.
+  Also this prevents relay from timing out when a freshly subscribed replica
+  needs rows from the end of a relatively long (hundreds of MBs) `.xlog`
+  (gh-5979).
+* The counter in `x.yM rows processed` log messages does not reset on each new
+  recovered `xlog` anymore.
diff --git a/src/box/box.cc b/src/box/box.cc
index 59925962d..8a7b8593d 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -335,8 +335,6 @@ struct wal_stream {
 	 * matches LSN of a first global row.
 	 */
 	bool has_global_row;
-	/** How many rows have been recovered so far. */
-	size_t rows;
 };
 
 /**
@@ -570,12 +568,6 @@ end_diag_request:
 static void
 wal_stream_try_yield(struct wal_stream *stream)
 {
-	/*
-	 * Save the yield. Otherwise it would happen only on rows which
-	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
-	 * transaction, which is probably a very rare coincidence.
-	 */
-	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
 	if (wal_stream_has_tx(stream) || !stream->has_yield)
 		return;
 	stream->has_yield = false;
@@ -587,11 +579,6 @@ wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
 {
 	struct wal_stream *stream =
 		container_of(base, struct wal_stream, base);
-	/*
-	 * Account all rows, even non-DML, and even leading to an error. Because
-	 * still need to yield sometimes.
-	 */
-	++stream->rows;
 	if (iproto_type_is_synchro_request(row->type)) {
 		if (wal_stream_apply_synchro_row(stream, row) != 0)
 			goto end_error;
@@ -610,15 +597,28 @@ end_error:
 	diag_raise();
 }
 
+static struct wal_stream wal_stream;
+
+/**
+ * Plan a yield in recovery stream. Wal stream will execute it as soon as it's
+ * ready.
+ */
+static void
+wal_stream_schedule_yield(void)
+{
+	wal_stream.has_yield = true;
+	wal_stream_try_yield(&wal_stream);
+}
+
 static void
 wal_stream_create(struct wal_stream *ctx)
 {
-	xstream_create(&ctx->base, wal_stream_apply_row);
+	xstream_create(&ctx->base, wal_stream_apply_row,
+		       wal_stream_schedule_yield);
 	ctx->tsn = 0;
 	ctx->first_row_lsn = 0;
 	ctx->has_yield = false;
 	ctx->has_global_row = false;
-	ctx->rows = 0;
 }
 
 /* {{{ configuration bindings */
@@ -3124,7 +3124,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
 
-	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
 	auto stream_guard = make_scoped_guard([&]{
 		wal_stream_abort(&wal_stream);
@@ -3132,8 +3131,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct recovery *recovery;
 	bool is_force_recovery = cfg_geti("force_recovery");
-	recovery = recovery_new(wal_dir(), is_force_recovery,
-				checkpoint_vclock);
+	recovery = recovery_new(wal_dir(), is_force_recovery, checkpoint_vclock);
 
 	/*
 	 * Make sure we report the actual recovery position
@@ -3151,7 +3149,8 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 * so we must reflect this in replicaset vclock to
 	 * not attempt to apply these rows twice.
 	 */
-	recovery_scan(recovery, &replicaset.vclock, &gc.vclock);
+	recovery_scan(recovery, &replicaset.vclock, &gc.vclock,
+		      &wal_stream.base);
 	say_info("instance vclock %s", vclock_to_string(&replicaset.vclock));
 
 	if (wal_dir_lock >= 0) {
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index e076cd71d..6c4982b9f 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -956,6 +956,7 @@ memtx_engine_join(struct engine *engine, void *arg, struct xstream *stream)
 	memtx->replica_join_cord = &cord;
 	int res = cord_cojoin(&cord);
 	memtx->replica_join_cord = NULL;
+	xstream_reset(stream);
 	return res;
 }
 
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index cd33e7635..e5b88fcc2 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -119,7 +119,7 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 
 void
 recovery_scan(struct recovery *r, struct vclock *end_vclock,
-	      struct vclock *gc_vclock)
+	      struct vclock *gc_vclock, struct xstream *stream)
 {
 	xdir_scan_xc(&r->wal_dir, true);
 
@@ -139,9 +139,14 @@ recovery_scan(struct recovery *r, struct vclock *end_vclock,
 	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
 		return;
 	struct xrow_header row;
-	while (xlog_cursor_next(&cursor, &row, true) == 0)
+	while (xlog_cursor_next(&cursor, &row, true) == 0) {
 		vclock_follow_xrow(end_vclock, &row);
+		if (++stream->row_count % WAL_ROWS_PER_YIELD == 0)
+			xstream_schedule_yield(stream);
+	}
 	xlog_cursor_close(&cursor, false);
+
+	xstream_reset(stream);
 }
 
 static inline void
@@ -242,9 +247,14 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 	     const struct vclock *stop_vclock)
 {
 	struct xrow_header row;
-	uint64_t row_count = 0;
 	while (xlog_cursor_next_xc(&r->cursor, &row,
 				   r->wal_dir.force_recovery) == 0) {
+		if (++stream->row_count % WAL_ROWS_PER_YIELD == 0) {
+			xstream_schedule_yield(stream);
+		}
+		if (stream->row_count % 100000 == 0)
+			say_info("%.1fM rows processed",
+				 stream->row_count / 1000000.);
 		/*
 		 * Read the next row from xlog file.
 		 *
@@ -273,12 +283,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 		 * failed row anyway.
 		 */
 		vclock_follow_xrow(&r->vclock, &row);
-		if (xstream_write(stream, &row) == 0) {
-			++row_count;
-			if (row_count % 100000 == 0)
-				say_info("%.1fM rows processed",
-					 row_count / 1000000.);
-		} else {
+		if (xstream_write(stream, &row) != 0) {
 			if (!r->wal_dir.force_recovery)
 				diag_raise();
 
diff --git a/src/box/recovery.h b/src/box/recovery.h
index c8ccaa553..987932903 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -74,7 +74,7 @@ recovery_delete(struct recovery *r);
  */
 void
 recovery_scan(struct recovery *r,  struct vclock *end_vclock,
-	      struct vclock *gc_vclock);
+	      struct vclock *gc_vclock, struct xstream *stream);
 
 void
 recovery_follow_local(struct recovery *r, struct xstream *stream,
diff --git a/src/box/relay.cc b/src/box/relay.cc
index ff43c2fc7..81ac35bf2 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -260,11 +260,18 @@ relay_new(struct replica *replica)
 	return relay;
 }
 
+/** A callback recovery calls every now and then to unblock the event loop. */
+static void
+relay_yield(void)
+{
+	fiber_sleep(0);
+}
+
 static void
 relay_start(struct relay *relay, int fd, uint64_t sync,
 	     void (*stream_write)(struct xstream *, struct xrow_header *))
 {
-	xstream_create(&relay->stream, stream_write);
+	xstream_create(&relay->stream, stream_write, relay_yield);
 	/*
 	 * Clear the diagnostics at start, in case it has the old
 	 * error message which we keep around to display in
@@ -928,13 +935,6 @@ relay_send(struct relay *relay, struct xrow_header *packet)
 	coio_write_xrow(&relay->io, packet);
 	fiber_gc();
 
-	/*
-	 * It may happen that the socket is always ready for write, so yield
-	 * explicitly every now and then to not block the event loop.
-	 */
-	if (++relay->row_count % WAL_ROWS_PER_YIELD == 0)
-		fiber_sleep(0);
-
 	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
 	if (inj != NULL && inj->dparam > 0)
 		fiber_sleep(inj->dparam);
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index b53e97593..647566623 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -3033,6 +3033,7 @@ vinyl_engine_join(struct engine *engine, void *arg, struct xstream *stream)
 		if (++loops % VY_YIELD_LOOPS == 0)
 			fiber_sleep(0);
 	}
+	xstream_reset(stream);
 	return 0;
 }
 
diff --git a/src/box/xstream.h b/src/box/xstream.h
index d29ff4213..d27de09a3 100644
--- a/src/box/xstream.h
+++ b/src/box/xstream.h
@@ -41,16 +41,40 @@ extern "C" {
 struct xrow_header;
 struct xstream;
 
+/**
+ * A type for a callback invoked by recovery after some batch of rows is
+ * processed. Is used mostly to unblock the event loop every now and then.
+ */
+typedef void (*schedule_yield_f)(void);
+
 typedef void (*xstream_write_f)(struct xstream *, struct xrow_header *);
 
 struct xstream {
 	xstream_write_f write;
+	schedule_yield_f schedule_yield;
+	uint64_t row_count;
 };
 
 static inline void
-xstream_create(struct xstream *xstream, xstream_write_f write)
+xstream_create(struct xstream *xstream, xstream_write_f write,
+	       schedule_yield_f schedule_yield)
 {
 	xstream->write = write;
+	xstream->schedule_yield = schedule_yield;
+	xstream->row_count = 0;
+}
+
+static inline void
+xstream_schedule_yield(struct xstream *stream)
+{
+	stream->schedule_yield();
+}
+
+static inline void
+xstream_reset(struct xstream *stream)
+{
+	stream->row_count = 0;
+	xstream_schedule_yield(stream);
 }
 
 int
-- 
2.30.1 (Apple Git-130)
    
    
More information about the Tarantool-patches
mailing list