From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 02/10] vinyl: add a separate thread for vylog Date: Fri, 17 May 2019 17:52:36 +0300 Message-Id: <16044855a7f1cb73e13baaa6ccd20dfdc0c9e48f.1558103547.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org List-ID: Historically, we use the WAL thread for writing vylog files, because, I guess, we didn't want to introduce a separate thread. However, that design decision turned out to be quite dubious: - vy_log (vy_log.c) calls vy_log_writer (wal.c) while vy_log_writer calls back to vy_log. That is we have a piece of logic split crudely between two files, which makes the code difficult to follow and just looks ugly. - We can't make vy_log part of vy_env because of this relationship. In fact, vy_log is the last singleton in the whole vy implementation: everything else is wrapped neatly (well, not quite, but still) in vy_env struct. - We can't kill the infamous vy_log.latch, which is a prerequisite for transactional DDL. The latch is needed to sync between vy_log readers and writers. You see, currently we can't read vy_log in the same thread where we write it, which would eliminate the need for any kind of synchronization, because vy_log read is quite a heavy operation - it may stall WAL writes and thus badly affect latency. So we have to do it from a coio thread. That being said, it's time to move vy_log writer back to where it belongs - vy_log.c, separate thread. This patch does just that. It doesn't patch the implementation to fix the flaws enumerated above - this will be done by following patches. --- src/box/vy_log.c | 113 +++++++++++++++++++++++++++++++++++++++++++++++++++---- src/box/vy_log.h | 6 --- src/box/wal.c | 79 -------------------------------------- src/box/wal.h | 15 -------- 4 files changed, 105 insertions(+), 108 deletions(-) diff --git a/src/box/vy_log.c b/src/box/vy_log.c index edd61b33..25ab73fd 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -46,19 +46,20 @@ #include #include "assoc.h" +#include "cbus.h" #include "coio_task.h" #include "diag.h" #include "errcode.h" #include "errinj.h" #include "fiber.h" #include "iproto_constants.h" /* IPROTO_INSERT */ +#include "journal.h" #include "key_def.h" #include "latch.h" #include "replication.h" /* INSTANCE_UUID */ #include "salad/stailq.h" #include "say.h" #include "tt_static.h" -#include "wal.h" #include "vclock.h" #include "xlog.h" #include "xrow.h" @@ -178,6 +179,14 @@ struct vy_log { * only relevant if @tx_failed is set. */ struct diag tx_diag; + /** Vylog file. */ + struct xlog xlog; + /** Vylog IO thread. */ + struct cord cord; + /** Pipe to the vylog thread. */ + struct cpipe pipe; + /** Returning pipe from the vylog thread back to tx. */ + struct cpipe tx_pipe; }; static struct vy_log vy_log; @@ -189,6 +198,9 @@ vy_recovery_process_record(struct vy_recovery *recovery, const struct vy_log_record *record); static int +vy_log_open(void); + +static int vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery); int @@ -731,6 +743,26 @@ err: return NULL; } +/** Vylog thread main loop. */ +static int +vy_log_thread_f(va_list ap) +{ + (void)ap; + + struct cbus_endpoint endpoint; + cbus_endpoint_create(&endpoint, "vylog", fiber_schedule_cb, fiber()); + + cpipe_create(&vy_log.tx_pipe, "tx"); + + cbus_loop(&endpoint); + + if (xlog_is_open(&vy_log.xlog)) + xlog_close(&vy_log.xlog, false); + + cpipe_destroy(&vy_log.tx_pipe); + return 0; +} + void vy_log_init(const char *dir) { @@ -740,7 +772,45 @@ vy_log_init(const char *dir) region_create(&vy_log.pool, cord_slab_cache()); stailq_create(&vy_log.tx); diag_create(&vy_log.tx_diag); - wal_init_vy_log(); + xlog_clear(&vy_log.xlog); + + if (cord_costart(&vy_log.cord, "vinyl.log", vy_log_thread_f, NULL) != 0) + panic_syserror("failed to start vinyl log thread"); + + cpipe_create(&vy_log.pipe, "vylog"); +} + +struct vy_log_flush_msg { + struct cbus_call_msg base; + struct journal_entry *entry; +}; + +static int +vy_log_flush_f(struct cbus_call_msg *base) +{ + struct vy_log_flush_msg *msg = (struct vy_log_flush_msg *)base; + struct journal_entry *entry = msg->entry; + struct xlog *xlog = &vy_log.xlog; + + if (!xlog_is_open(xlog)) { + if (vy_log_open() < 0) + return -1; + } + + xlog_tx_begin(xlog); + for (int i = 0; i < entry->n_rows; ++i) { + entry->rows[i]->tm = ev_now(loop()); + if (xlog_write_row(xlog, entry->rows[i]) < 0) { + xlog_tx_rollback(xlog); + return -1; + } + } + + if (xlog_tx_commit(xlog) < 0 || + xlog_flush(xlog) < 0) + return -1; + + return 0; } /** @@ -798,10 +868,16 @@ vy_log_flush(void) assert(i == tx_size); /* - * Do actual disk writes on behalf of the WAL + * Do actual disk writes on behalf of the vylog thread * so as not to block the tx thread. */ - if (wal_write_vy_log(entry) != 0) + struct vy_log_flush_msg msg; + msg.entry = entry; + bool cancellable = fiber_set_cancellable(false); + int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base, + vy_log_flush_f, NULL, TIMEOUT_INFINITY); + fiber_set_cancellable(cancellable); + if (rc != 0) goto err; /* Success. Free flushed records. */ @@ -817,14 +893,21 @@ err: void vy_log_free(void) { + cbus_stop_loop(&vy_log.pipe); + if (cord_join(&vy_log.cord) != 0) + panic_syserror("failed to join vinyl log thread"); xdir_destroy(&vy_log.dir); region_destroy(&vy_log.pool); diag_destroy(&vy_log.tx_diag); } -int -vy_log_open(struct xlog *xlog) +/** + * Open current vy_log file. + */ +static int +vy_log_open(void) { + struct xlog *xlog = &vy_log.xlog; /* * Open the current log file or create a new one * if it doesn't exist. @@ -1022,6 +1105,15 @@ vy_log_rotate_f(va_list ap) return vy_log_create(vclock, recovery); } +static int +vy_log_close_f(struct cbus_call_msg *msg) +{ + (void)msg; + if (xlog_is_open(&vy_log.xlog)) + xlog_close(&vy_log.xlog, false); + return 0; +} + int vy_log_rotate(const struct vclock *vclock) { @@ -1069,9 +1161,14 @@ vy_log_rotate(const struct vclock *vclock) /* * Success. Close the old log. The new one will be opened - * automatically on the first write (see wal_write_vy_log()). + * automatically on the first write (see vy_log_flush_f()). */ - wal_rotate_vy_log(); + struct cbus_call_msg msg; + bool cancellable = fiber_set_cancellable(false); + cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg, + vy_log_close_f, NULL, TIMEOUT_INFINITY); + fiber_set_cancellable(cancellable); + vclock_copy(&vy_log.last_checkpoint, vclock); /* Add the new vclock to the xdir so that we can track it. */ diff --git a/src/box/vy_log.h b/src/box/vy_log.h index ee38c193..81f62706 100644 --- a/src/box/vy_log.h +++ b/src/box/vy_log.h @@ -439,12 +439,6 @@ void vy_log_free(void); /** - * Open current vy_log file. - */ -int -vy_log_open(struct xlog *xlog); - -/** * Rotate the metadata log. This function creates a new * xlog file in the log directory having vclock @vclock * and writes records required to recover active LSM trees. diff --git a/src/box/wal.c b/src/box/wal.c index 25bceb68..1f93e162 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -39,7 +39,6 @@ #include "xlog.h" #include "xrow.h" -#include "vy_log.h" #include "cbus.h" #include "coio_task.h" #include "replication.h" @@ -173,15 +172,6 @@ struct wal_msg { struct vclock vclock; }; -/** - * Vinyl metadata log writer. - */ -struct vy_log_writer { - /** The metadata log file. */ - struct xlog xlog; -}; - -static struct vy_log_writer vy_log_writer; static struct wal_writer wal_writer_singleton; enum wal_mode @@ -1115,9 +1105,6 @@ wal_writer_f(va_list ap) if (xlog_is_open(&writer->current_wal)) xlog_close(&writer->current_wal, false); - if (xlog_is_open(&vy_log_writer.xlog)) - xlog_close(&vy_log_writer.xlog, false); - cpipe_destroy(&writer->tx_prio_pipe); return 0; } @@ -1198,72 +1185,6 @@ wal_write_in_wal_mode_none(struct journal *journal, return vclock_sum(&writer->vclock); } -void -wal_init_vy_log() -{ - xlog_clear(&vy_log_writer.xlog); -} - -struct wal_write_vy_log_msg -{ - struct cbus_call_msg base; - struct journal_entry *entry; -}; - -static int -wal_write_vy_log_f(struct cbus_call_msg *msg) -{ - struct journal_entry *entry = - ((struct wal_write_vy_log_msg *)msg)->entry; - - if (! xlog_is_open(&vy_log_writer.xlog)) { - if (vy_log_open(&vy_log_writer.xlog) < 0) - return -1; - } - - if (xlog_write_entry(&vy_log_writer.xlog, entry) < 0) - return -1; - - if (xlog_flush(&vy_log_writer.xlog) < 0) - return -1; - - return 0; -} - -int -wal_write_vy_log(struct journal_entry *entry) -{ - struct wal_writer *writer = &wal_writer_singleton; - struct wal_write_vy_log_msg msg; - msg.entry= entry; - bool cancellable = fiber_set_cancellable(false); - int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, - &msg.base, wal_write_vy_log_f, NULL, - TIMEOUT_INFINITY); - fiber_set_cancellable(cancellable); - return rc; -} - -static int -wal_rotate_vy_log_f(struct cbus_call_msg *msg) -{ - (void) msg; - if (xlog_is_open(&vy_log_writer.xlog)) - xlog_close(&vy_log_writer.xlog, false); - return 0; -} - -void -wal_rotate_vy_log() -{ - struct wal_writer *writer = &wal_writer_singleton; - struct cbus_call_msg msg; - bool cancellable = fiber_set_cancellable(false); - cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg, - wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY); - fiber_set_cancellable(cancellable); -} - static void wal_watcher_notify(struct wal_watcher *watcher, unsigned events) { diff --git a/src/box/wal.h b/src/box/wal.h index a88a3f23..8448f6d4 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -222,21 +222,6 @@ wal_set_checkpoint_threshold(int64_t threshold); void wal_collect_garbage(const struct vclock *vclock); -void -wal_init_vy_log(); - -/** - * Write xrows to the vinyl metadata log. - */ -int -wal_write_vy_log(struct journal_entry *req); - -/** - * Rotate the vinyl metadata log. - */ -void -wal_rotate_vy_log(); - #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ -- 2.11.0