[PATCH 03/10] vinyl: move vylog recovery to vylog thread

Vladimir Davydov vdavydov.dev at gmail.com
Fri May 17 17:52:37 MSK 2019


We used coio, because vylog was written from a WAL thread, which
shouldn't be used for such a heavy operation as vylog recovery.
Now, we can move it to the dedicated vylog thread. This allows
us to simplify rotation logic as well: now most of work is done
from the same function (vy_log_rotate_f) executed by vylog thread,
not scattered between coio and WAL, as it used to be.

This is a step toward removal of the vylog latch, which blocks
transaction DDL implementation.
---
 src/box/vy_log.c | 174 ++++++++++++++++++++++++++++++++-----------------------
 1 file changed, 100 insertions(+), 74 deletions(-)

diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 25ab73fd..cf967595 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -47,7 +47,6 @@
 
 #include "assoc.h"
 #include "cbus.h"
-#include "coio_task.h"
 #include "diag.h"
 #include "errcode.h"
 #include "errinj.h"
@@ -191,7 +190,7 @@ struct vy_log {
 static struct vy_log vy_log;
 
 static struct vy_recovery *
-vy_recovery_new_locked(int64_t signature, int flags);
+vy_recovery_load(int64_t signature, int flags);
 
 static int
 vy_recovery_process_record(struct vy_recovery *recovery,
@@ -203,8 +202,8 @@ vy_log_open(void);
 static int
 vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery);
 
-int
-vy_log_rotate(const struct vclock *vclock);
+static int
+vy_log_flush(void);
 
 /**
  * Return the name of the vylog file that has the given signature.
@@ -1097,18 +1096,31 @@ vy_log_end_recovery(void)
 	return 0;
 }
 
-static ssize_t
-vy_log_rotate_f(va_list ap)
-{
-	struct vy_recovery *recovery = va_arg(ap, struct vy_recovery *);
-	const struct vclock *vclock = va_arg(ap, const struct vclock *);
-	return vy_log_create(vclock, recovery);
-}
+struct vy_log_rotate_msg {
+	struct cbus_call_msg base;
+	struct vclock vclock;
+};
 
 static int
-vy_log_close_f(struct cbus_call_msg *msg)
+vy_log_rotate_f(struct cbus_call_msg *base)
 {
-	(void)msg;
+	struct vy_log_rotate_msg *msg = (struct vy_log_rotate_msg *)base;
+	int64_t prev_signature = vclock_sum(&vy_log.last_checkpoint);
+
+	/* Load the last vylog into a recovery context. */
+	struct vy_recovery *recovery = vy_recovery_load(prev_signature, 0);
+	if (recovery == NULL)
+		return -1;
+
+	/* Write the contents of the recovery context to the new vylog. */
+	int rc = vy_log_create(&msg->vclock, recovery);
+	vy_recovery_delete(recovery);
+	if (rc != 0)
+		return -1;
+	/*
+	 * Success. Close the old log. The new one will be opened
+	 * automatically on the first write (see vy_log_flush_f()).
+	 */
 	if (xlog_is_open(&vy_log.xlog))
 		xlog_close(&vy_log.xlog, false);
 	return 0;
@@ -1145,30 +1157,24 @@ vy_log_rotate(const struct vclock *vclock)
 	 */
 	latch_lock(&vy_log.latch);
 
-	struct vy_recovery *recovery;
-	recovery = vy_recovery_new_locked(prev_signature, 0);
-	if (recovery == NULL)
+	if (vy_log_flush() != 0) {
+		diag_log();
+		say_error("failed to flush vylog for checkpoint");
 		goto fail;
+	}
 
 	/* Do actual work from coio so as not to stall tx thread. */
-	int rc = coio_call(vy_log_rotate_f, recovery, vclock);
-	vy_recovery_delete(recovery);
-	if (rc < 0) {
-		diag_log();
-		say_error("failed to write `%s'", vy_log_filename(signature));
-		goto fail;
-	}
+	struct vy_log_rotate_msg msg;
+	vclock_copy(&msg.vclock, vclock);
 
-	/*
-	 * Success. Close the old log. The new one will be opened
-	 * automatically on the first write (see vy_log_flush_f()).
-	 */
-	struct cbus_call_msg msg;
 	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg,
-		  vy_log_close_f, NULL, TIMEOUT_INFINITY);
+	int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base,
+			   vy_log_rotate_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 
+	if (rc != 0)
+		goto fail;
+
 	vclock_copy(&vy_log.last_checkpoint, vclock);
 
 	/* Add the new vclock to the xdir so that we can track it. */
@@ -2281,13 +2287,14 @@ vy_recovery_build_index_id_hash(struct vy_recovery *recovery)
 	return 0;
 }
 
-static ssize_t
-vy_recovery_new_f(va_list ap)
+/**
+ * Load recovery context from the vylog file identified by
+ * the given signature. This is a blocking operation, which
+ * is supposed to be called from the vylog thread.
+ */
+static struct vy_recovery *
+vy_recovery_load(int64_t signature, int flags)
 {
-	int64_t signature = va_arg(ap, int64_t);
-	int flags = va_arg(ap, int);
-	struct vy_recovery **p_recovery = va_arg(ap, struct vy_recovery **);
-
 	say_verbose("loading vylog %lld", (long long)signature);
 
 	struct vy_recovery *recovery = malloc(sizeof(*recovery));
@@ -2369,58 +2376,71 @@ vy_recovery_new_f(va_list ap)
 		goto fail_free;
 out:
 	say_verbose("done loading vylog");
-	*p_recovery = recovery;
-	return 0;
+	return recovery;
 
 fail_close:
 	xlog_cursor_close(&cursor, false);
 fail_free:
 	vy_recovery_delete(recovery);
 fail:
-	return -1;
+	diag_log();
+	say_error("failed to load `%s'", vy_log_filename(signature));
+	return NULL;
 }
 
-/**
- * Load the metadata log and return a recovery context.
- * Must be called with the log latch held.
- */
-static struct vy_recovery *
-vy_recovery_new_locked(int64_t signature, int flags)
-{
-	int rc;
+struct vy_recovery_msg {
+	struct cbus_call_msg base;
+	int signature;
+	int flags;
 	struct vy_recovery *recovery;
+};
+
+int
+vy_recovery_new_f(struct cbus_call_msg *base)
+{
+	struct vy_recovery_msg *msg = (struct vy_recovery_msg *)base;
+
+	msg->recovery = vy_recovery_load(msg->signature, msg->flags);
+	if (msg->recovery == NULL)
+		return -1;
+
+	return 0;
+}
+
+struct vy_recovery *
+vy_recovery_new(int64_t signature, int flags)
+{
+	/* Lock out concurrent writers while we are loading the log. */
+	latch_lock(&vy_log.latch);
 
-	assert(latch_owner(&vy_log.latch) == fiber());
 	/*
 	 * Before proceeding to log recovery, make sure that all
 	 * pending records have been flushed out.
 	 */
-	rc = vy_log_flush();
-	if (rc != 0) {
+	if (vy_log_flush() != 0) {
 		diag_log();
 		say_error("failed to flush vylog for recovery");
-		return NULL;
+		goto fail;
 	}
 
-	/* Load the log from coio so as not to stall tx thread. */
-	rc = coio_call(vy_recovery_new_f, signature, flags, &recovery);
-	if (rc != 0) {
-		diag_log();
-		say_error("failed to load `%s'", vy_log_filename(signature));
-		return NULL;
-	}
-	return recovery;
-}
+	struct vy_recovery_msg msg;
+	msg.signature = signature;
+	msg.flags = flags;
+	msg.recovery = NULL;
+
+	bool cancellable = fiber_set_cancellable(false);
+	int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base,
+			   vy_recovery_new_f, NULL, TIMEOUT_INFINITY);
+	fiber_set_cancellable(cancellable);
 
-struct vy_recovery *
-vy_recovery_new(int64_t signature, int flags)
-{
-	/* Lock out concurrent writers while we are loading the log. */
-	latch_lock(&vy_log.latch);
-	struct vy_recovery *recovery;
-	recovery = vy_recovery_new_locked(signature, flags);
+	if (rc != 0)
+		goto fail;
+
+	latch_unlock(&vy_log.latch);
+	return msg.recovery;
+fail:
 	latch_unlock(&vy_log.latch);
-	return recovery;
+	return NULL;
 }
 
 void
@@ -2563,6 +2583,8 @@ vy_log_append_lsm(struct xlog *xlog, struct vy_lsm_recovery_info *lsm)
 static int
 vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery)
 {
+	struct errinj *inj = errinj(ERRINJ_VY_LOG_FILE_RENAME, ERRINJ_BOOL);
+
 	say_verbose("saving vylog %lld", (long long)vclock_sum(vclock));
 
 	/*
@@ -2592,11 +2614,10 @@ vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery)
 	if (vy_log_append_record(&xlog, &record) != 0)
 		goto err_write_xlog;
 
-	ERROR_INJECT(ERRINJ_VY_LOG_FILE_RENAME, {
+	if (inj && inj->bparam) {
 		diag_set(ClientError, ER_INJECTION, "vinyl log file rename");
-		xlog_close(&xlog, false);
-		return -1;
-	});
+		goto err_write_xlog;
+	}
 
 	/* Finalize the new xlog. */
 	if (xlog_flush(&xlog) < 0 ||
@@ -2610,13 +2631,18 @@ done:
 	return 0;
 
 err_write_xlog:
-	/* Delete the unfinished xlog. */
+	/*
+	 * Delete the unfinished xlog unless ERRINJ_VY_LOG_FILE_RENAME
+	 * is set (we use it to test collection of .inprogress files).
+	 */
 	assert(xlog_is_open(&xlog));
-	if (unlink(xlog.filename) < 0)
+	if ((!inj || !inj->bparam) && unlink(xlog.filename) < 0)
 		say_syserror("failed to delete file '%s'",
 			     xlog.filename);
 	xlog_close(&xlog, false);
 
 err_create_xlog:
+	diag_log();
+	say_error("failed to write `%s'", vy_log_filename(vclock_sum(vclock)));
 	return -1;
 }
-- 
2.11.0




More information about the Tarantool-patches mailing list