From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 03/10] vinyl: move vylog recovery to vylog thread Date: Fri, 17 May 2019 17:52:37 +0300 Message-Id: <69dfb6ed09c655e842ae9200598fce9c62176998.1558103547.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org List-ID: We used coio, because vylog was written from a WAL thread, which shouldn't be used for such a heavy operation as vylog recovery. Now, we can move it to the dedicated vylog thread. This allows us to simplify rotation logic as well: now most of work is done from the same function (vy_log_rotate_f) executed by vylog thread, not scattered between coio and WAL, as it used to be. This is a step toward removal of the vylog latch, which blocks transaction DDL implementation. --- src/box/vy_log.c | 174 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 100 insertions(+), 74 deletions(-) diff --git a/src/box/vy_log.c b/src/box/vy_log.c index 25ab73fd..cf967595 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -47,7 +47,6 @@ #include "assoc.h" #include "cbus.h" -#include "coio_task.h" #include "diag.h" #include "errcode.h" #include "errinj.h" @@ -191,7 +190,7 @@ struct vy_log { static struct vy_log vy_log; static struct vy_recovery * -vy_recovery_new_locked(int64_t signature, int flags); +vy_recovery_load(int64_t signature, int flags); static int vy_recovery_process_record(struct vy_recovery *recovery, @@ -203,8 +202,8 @@ vy_log_open(void); static int vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery); -int -vy_log_rotate(const struct vclock *vclock); +static int +vy_log_flush(void); /** * Return the name of the vylog file that has the given signature. @@ -1097,18 +1096,31 @@ vy_log_end_recovery(void) return 0; } -static ssize_t -vy_log_rotate_f(va_list ap) -{ - struct vy_recovery *recovery = va_arg(ap, struct vy_recovery *); - const struct vclock *vclock = va_arg(ap, const struct vclock *); - return vy_log_create(vclock, recovery); -} +struct vy_log_rotate_msg { + struct cbus_call_msg base; + struct vclock vclock; +}; static int -vy_log_close_f(struct cbus_call_msg *msg) +vy_log_rotate_f(struct cbus_call_msg *base) { - (void)msg; + struct vy_log_rotate_msg *msg = (struct vy_log_rotate_msg *)base; + int64_t prev_signature = vclock_sum(&vy_log.last_checkpoint); + + /* Load the last vylog into a recovery context. */ + struct vy_recovery *recovery = vy_recovery_load(prev_signature, 0); + if (recovery == NULL) + return -1; + + /* Write the contents of the recovery context to the new vylog. */ + int rc = vy_log_create(&msg->vclock, recovery); + vy_recovery_delete(recovery); + if (rc != 0) + return -1; + /* + * Success. Close the old log. The new one will be opened + * automatically on the first write (see vy_log_flush_f()). + */ if (xlog_is_open(&vy_log.xlog)) xlog_close(&vy_log.xlog, false); return 0; @@ -1145,30 +1157,24 @@ vy_log_rotate(const struct vclock *vclock) */ latch_lock(&vy_log.latch); - struct vy_recovery *recovery; - recovery = vy_recovery_new_locked(prev_signature, 0); - if (recovery == NULL) + if (vy_log_flush() != 0) { + diag_log(); + say_error("failed to flush vylog for checkpoint"); goto fail; + } /* Do actual work from coio so as not to stall tx thread. */ - int rc = coio_call(vy_log_rotate_f, recovery, vclock); - vy_recovery_delete(recovery); - if (rc < 0) { - diag_log(); - say_error("failed to write `%s'", vy_log_filename(signature)); - goto fail; - } + struct vy_log_rotate_msg msg; + vclock_copy(&msg.vclock, vclock); - /* - * Success. Close the old log. The new one will be opened - * automatically on the first write (see vy_log_flush_f()). - */ - struct cbus_call_msg msg; bool cancellable = fiber_set_cancellable(false); - cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg, - vy_log_close_f, NULL, TIMEOUT_INFINITY); + int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base, + vy_log_rotate_f, NULL, TIMEOUT_INFINITY); fiber_set_cancellable(cancellable); + if (rc != 0) + goto fail; + vclock_copy(&vy_log.last_checkpoint, vclock); /* Add the new vclock to the xdir so that we can track it. */ @@ -2281,13 +2287,14 @@ vy_recovery_build_index_id_hash(struct vy_recovery *recovery) return 0; } -static ssize_t -vy_recovery_new_f(va_list ap) +/** + * Load recovery context from the vylog file identified by + * the given signature. This is a blocking operation, which + * is supposed to be called from the vylog thread. + */ +static struct vy_recovery * +vy_recovery_load(int64_t signature, int flags) { - int64_t signature = va_arg(ap, int64_t); - int flags = va_arg(ap, int); - struct vy_recovery **p_recovery = va_arg(ap, struct vy_recovery **); - say_verbose("loading vylog %lld", (long long)signature); struct vy_recovery *recovery = malloc(sizeof(*recovery)); @@ -2369,58 +2376,71 @@ vy_recovery_new_f(va_list ap) goto fail_free; out: say_verbose("done loading vylog"); - *p_recovery = recovery; - return 0; + return recovery; fail_close: xlog_cursor_close(&cursor, false); fail_free: vy_recovery_delete(recovery); fail: - return -1; + diag_log(); + say_error("failed to load `%s'", vy_log_filename(signature)); + return NULL; } -/** - * Load the metadata log and return a recovery context. - * Must be called with the log latch held. - */ -static struct vy_recovery * -vy_recovery_new_locked(int64_t signature, int flags) -{ - int rc; +struct vy_recovery_msg { + struct cbus_call_msg base; + int signature; + int flags; struct vy_recovery *recovery; +}; + +int +vy_recovery_new_f(struct cbus_call_msg *base) +{ + struct vy_recovery_msg *msg = (struct vy_recovery_msg *)base; + + msg->recovery = vy_recovery_load(msg->signature, msg->flags); + if (msg->recovery == NULL) + return -1; + + return 0; +} + +struct vy_recovery * +vy_recovery_new(int64_t signature, int flags) +{ + /* Lock out concurrent writers while we are loading the log. */ + latch_lock(&vy_log.latch); - assert(latch_owner(&vy_log.latch) == fiber()); /* * Before proceeding to log recovery, make sure that all * pending records have been flushed out. */ - rc = vy_log_flush(); - if (rc != 0) { + if (vy_log_flush() != 0) { diag_log(); say_error("failed to flush vylog for recovery"); - return NULL; + goto fail; } - /* Load the log from coio so as not to stall tx thread. */ - rc = coio_call(vy_recovery_new_f, signature, flags, &recovery); - if (rc != 0) { - diag_log(); - say_error("failed to load `%s'", vy_log_filename(signature)); - return NULL; - } - return recovery; -} + struct vy_recovery_msg msg; + msg.signature = signature; + msg.flags = flags; + msg.recovery = NULL; + + bool cancellable = fiber_set_cancellable(false); + int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base, + vy_recovery_new_f, NULL, TIMEOUT_INFINITY); + fiber_set_cancellable(cancellable); -struct vy_recovery * -vy_recovery_new(int64_t signature, int flags) -{ - /* Lock out concurrent writers while we are loading the log. */ - latch_lock(&vy_log.latch); - struct vy_recovery *recovery; - recovery = vy_recovery_new_locked(signature, flags); + if (rc != 0) + goto fail; + + latch_unlock(&vy_log.latch); + return msg.recovery; +fail: latch_unlock(&vy_log.latch); - return recovery; + return NULL; } void @@ -2563,6 +2583,8 @@ vy_log_append_lsm(struct xlog *xlog, struct vy_lsm_recovery_info *lsm) static int vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery) { + struct errinj *inj = errinj(ERRINJ_VY_LOG_FILE_RENAME, ERRINJ_BOOL); + say_verbose("saving vylog %lld", (long long)vclock_sum(vclock)); /* @@ -2592,11 +2614,10 @@ vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery) if (vy_log_append_record(&xlog, &record) != 0) goto err_write_xlog; - ERROR_INJECT(ERRINJ_VY_LOG_FILE_RENAME, { + if (inj && inj->bparam) { diag_set(ClientError, ER_INJECTION, "vinyl log file rename"); - xlog_close(&xlog, false); - return -1; - }); + goto err_write_xlog; + } /* Finalize the new xlog. */ if (xlog_flush(&xlog) < 0 || @@ -2610,13 +2631,18 @@ done: return 0; err_write_xlog: - /* Delete the unfinished xlog. */ + /* + * Delete the unfinished xlog unless ERRINJ_VY_LOG_FILE_RENAME + * is set (we use it to test collection of .inprogress files). + */ assert(xlog_is_open(&xlog)); - if (unlink(xlog.filename) < 0) + if ((!inj || !inj->bparam) && unlink(xlog.filename) < 0) say_syserror("failed to delete file '%s'", xlog.filename); xlog_close(&xlog, false); err_create_xlog: + diag_log(); + say_error("failed to write `%s'", vy_log_filename(vclock_sum(vclock))); return -1; } -- 2.11.0