[PATCH 03/10] vinyl: move vylog recovery to vylog thread
Vladimir Davydov
vdavydov.dev at gmail.com
Fri May 17 17:52:37 MSK 2019
We used coio, because vylog was written from a WAL thread, which
shouldn't be used for such a heavy operation as vylog recovery.
Now, we can move it to the dedicated vylog thread. This allows
us to simplify rotation logic as well: now most of work is done
from the same function (vy_log_rotate_f) executed by vylog thread,
not scattered between coio and WAL, as it used to be.
This is a step toward removal of the vylog latch, which blocks
transaction DDL implementation.
---
src/box/vy_log.c | 174 ++++++++++++++++++++++++++++++++-----------------------
1 file changed, 100 insertions(+), 74 deletions(-)
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 25ab73fd..cf967595 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -47,7 +47,6 @@
#include "assoc.h"
#include "cbus.h"
-#include "coio_task.h"
#include "diag.h"
#include "errcode.h"
#include "errinj.h"
@@ -191,7 +190,7 @@ struct vy_log {
static struct vy_log vy_log;
static struct vy_recovery *
-vy_recovery_new_locked(int64_t signature, int flags);
+vy_recovery_load(int64_t signature, int flags);
static int
vy_recovery_process_record(struct vy_recovery *recovery,
@@ -203,8 +202,8 @@ vy_log_open(void);
static int
vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery);
-int
-vy_log_rotate(const struct vclock *vclock);
+static int
+vy_log_flush(void);
/**
* Return the name of the vylog file that has the given signature.
@@ -1097,18 +1096,31 @@ vy_log_end_recovery(void)
return 0;
}
-static ssize_t
-vy_log_rotate_f(va_list ap)
-{
- struct vy_recovery *recovery = va_arg(ap, struct vy_recovery *);
- const struct vclock *vclock = va_arg(ap, const struct vclock *);
- return vy_log_create(vclock, recovery);
-}
+struct vy_log_rotate_msg {
+ struct cbus_call_msg base;
+ struct vclock vclock;
+};
static int
-vy_log_close_f(struct cbus_call_msg *msg)
+vy_log_rotate_f(struct cbus_call_msg *base)
{
- (void)msg;
+ struct vy_log_rotate_msg *msg = (struct vy_log_rotate_msg *)base;
+ int64_t prev_signature = vclock_sum(&vy_log.last_checkpoint);
+
+ /* Load the last vylog into a recovery context. */
+ struct vy_recovery *recovery = vy_recovery_load(prev_signature, 0);
+ if (recovery == NULL)
+ return -1;
+
+ /* Write the contents of the recovery context to the new vylog. */
+ int rc = vy_log_create(&msg->vclock, recovery);
+ vy_recovery_delete(recovery);
+ if (rc != 0)
+ return -1;
+ /*
+ * Success. Close the old log. The new one will be opened
+ * automatically on the first write (see vy_log_flush_f()).
+ */
if (xlog_is_open(&vy_log.xlog))
xlog_close(&vy_log.xlog, false);
return 0;
@@ -1145,30 +1157,24 @@ vy_log_rotate(const struct vclock *vclock)
*/
latch_lock(&vy_log.latch);
- struct vy_recovery *recovery;
- recovery = vy_recovery_new_locked(prev_signature, 0);
- if (recovery == NULL)
+ if (vy_log_flush() != 0) {
+ diag_log();
+ say_error("failed to flush vylog for checkpoint");
goto fail;
+ }
/* Do actual work from coio so as not to stall tx thread. */
- int rc = coio_call(vy_log_rotate_f, recovery, vclock);
- vy_recovery_delete(recovery);
- if (rc < 0) {
- diag_log();
- say_error("failed to write `%s'", vy_log_filename(signature));
- goto fail;
- }
+ struct vy_log_rotate_msg msg;
+ vclock_copy(&msg.vclock, vclock);
- /*
- * Success. Close the old log. The new one will be opened
- * automatically on the first write (see vy_log_flush_f()).
- */
- struct cbus_call_msg msg;
bool cancellable = fiber_set_cancellable(false);
- cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg,
- vy_log_close_f, NULL, TIMEOUT_INFINITY);
+ int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base,
+ vy_log_rotate_f, NULL, TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
+ if (rc != 0)
+ goto fail;
+
vclock_copy(&vy_log.last_checkpoint, vclock);
/* Add the new vclock to the xdir so that we can track it. */
@@ -2281,13 +2287,14 @@ vy_recovery_build_index_id_hash(struct vy_recovery *recovery)
return 0;
}
-static ssize_t
-vy_recovery_new_f(va_list ap)
+/**
+ * Load recovery context from the vylog file identified by
+ * the given signature. This is a blocking operation, which
+ * is supposed to be called from the vylog thread.
+ */
+static struct vy_recovery *
+vy_recovery_load(int64_t signature, int flags)
{
- int64_t signature = va_arg(ap, int64_t);
- int flags = va_arg(ap, int);
- struct vy_recovery **p_recovery = va_arg(ap, struct vy_recovery **);
-
say_verbose("loading vylog %lld", (long long)signature);
struct vy_recovery *recovery = malloc(sizeof(*recovery));
@@ -2369,58 +2376,71 @@ vy_recovery_new_f(va_list ap)
goto fail_free;
out:
say_verbose("done loading vylog");
- *p_recovery = recovery;
- return 0;
+ return recovery;
fail_close:
xlog_cursor_close(&cursor, false);
fail_free:
vy_recovery_delete(recovery);
fail:
- return -1;
+ diag_log();
+ say_error("failed to load `%s'", vy_log_filename(signature));
+ return NULL;
}
-/**
- * Load the metadata log and return a recovery context.
- * Must be called with the log latch held.
- */
-static struct vy_recovery *
-vy_recovery_new_locked(int64_t signature, int flags)
-{
- int rc;
+struct vy_recovery_msg {
+ struct cbus_call_msg base;
+ int signature;
+ int flags;
struct vy_recovery *recovery;
+};
+
+int
+vy_recovery_new_f(struct cbus_call_msg *base)
+{
+ struct vy_recovery_msg *msg = (struct vy_recovery_msg *)base;
+
+ msg->recovery = vy_recovery_load(msg->signature, msg->flags);
+ if (msg->recovery == NULL)
+ return -1;
+
+ return 0;
+}
+
+struct vy_recovery *
+vy_recovery_new(int64_t signature, int flags)
+{
+ /* Lock out concurrent writers while we are loading the log. */
+ latch_lock(&vy_log.latch);
- assert(latch_owner(&vy_log.latch) == fiber());
/*
* Before proceeding to log recovery, make sure that all
* pending records have been flushed out.
*/
- rc = vy_log_flush();
- if (rc != 0) {
+ if (vy_log_flush() != 0) {
diag_log();
say_error("failed to flush vylog for recovery");
- return NULL;
+ goto fail;
}
- /* Load the log from coio so as not to stall tx thread. */
- rc = coio_call(vy_recovery_new_f, signature, flags, &recovery);
- if (rc != 0) {
- diag_log();
- say_error("failed to load `%s'", vy_log_filename(signature));
- return NULL;
- }
- return recovery;
-}
+ struct vy_recovery_msg msg;
+ msg.signature = signature;
+ msg.flags = flags;
+ msg.recovery = NULL;
+
+ bool cancellable = fiber_set_cancellable(false);
+ int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base,
+ vy_recovery_new_f, NULL, TIMEOUT_INFINITY);
+ fiber_set_cancellable(cancellable);
-struct vy_recovery *
-vy_recovery_new(int64_t signature, int flags)
-{
- /* Lock out concurrent writers while we are loading the log. */
- latch_lock(&vy_log.latch);
- struct vy_recovery *recovery;
- recovery = vy_recovery_new_locked(signature, flags);
+ if (rc != 0)
+ goto fail;
+
+ latch_unlock(&vy_log.latch);
+ return msg.recovery;
+fail:
latch_unlock(&vy_log.latch);
- return recovery;
+ return NULL;
}
void
@@ -2563,6 +2583,8 @@ vy_log_append_lsm(struct xlog *xlog, struct vy_lsm_recovery_info *lsm)
static int
vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery)
{
+ struct errinj *inj = errinj(ERRINJ_VY_LOG_FILE_RENAME, ERRINJ_BOOL);
+
say_verbose("saving vylog %lld", (long long)vclock_sum(vclock));
/*
@@ -2592,11 +2614,10 @@ vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery)
if (vy_log_append_record(&xlog, &record) != 0)
goto err_write_xlog;
- ERROR_INJECT(ERRINJ_VY_LOG_FILE_RENAME, {
+ if (inj && inj->bparam) {
diag_set(ClientError, ER_INJECTION, "vinyl log file rename");
- xlog_close(&xlog, false);
- return -1;
- });
+ goto err_write_xlog;
+ }
/* Finalize the new xlog. */
if (xlog_flush(&xlog) < 0 ||
@@ -2610,13 +2631,18 @@ done:
return 0;
err_write_xlog:
- /* Delete the unfinished xlog. */
+ /*
+ * Delete the unfinished xlog unless ERRINJ_VY_LOG_FILE_RENAME
+ * is set (we use it to test collection of .inprogress files).
+ */
assert(xlog_is_open(&xlog));
- if (unlink(xlog.filename) < 0)
+ if ((!inj || !inj->bparam) && unlink(xlog.filename) < 0)
say_syserror("failed to delete file '%s'",
xlog.filename);
xlog_close(&xlog, false);
err_create_xlog:
+ diag_log();
+ say_error("failed to write `%s'", vy_log_filename(vclock_sum(vclock)));
return -1;
}
--
2.11.0
More information about the Tarantool-patches
mailing list