[PATCH 02/10] vinyl: add a separate thread for vylog
Vladimir Davydov
vdavydov.dev at gmail.com
Fri May 17 17:52:36 MSK 2019
Historically, we use the WAL thread for writing vylog files, because,
I guess, we didn't want to introduce a separate thread. However, that
design decision turned out to be quite dubious:
- vy_log (vy_log.c) calls vy_log_writer (wal.c) while vy_log_writer
calls back to vy_log. That is we have a piece of logic split crudely
between two files, which makes the code difficult to follow and just
looks ugly.
- We can't make vy_log part of vy_env because of this relationship.
In fact, vy_log is the last singleton in the whole vy implementation:
everything else is wrapped neatly (well, not quite, but still) in
vy_env struct.
- We can't kill the infamous vy_log.latch, which is a prerequisite for
transactional DDL. The latch is needed to sync between vy_log readers
and writers. You see, currently we can't read vy_log in the same
thread where we write it, which would eliminate the need for any kind
of synchronization, because vy_log read is quite a heavy operation -
it may stall WAL writes and thus badly affect latency. So we have to
do it from a coio thread.
That being said, it's time to move vy_log writer back to where it
belongs - vy_log.c, separate thread. This patch does just that. It
doesn't patch the implementation to fix the flaws enumerated above -
this will be done by following patches.
---
src/box/vy_log.c | 113 +++++++++++++++++++++++++++++++++++++++++++++++++++----
src/box/vy_log.h | 6 ---
src/box/wal.c | 79 --------------------------------------
src/box/wal.h | 15 --------
4 files changed, 105 insertions(+), 108 deletions(-)
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index edd61b33..25ab73fd 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -46,19 +46,20 @@
#include <small/rlist.h>
#include "assoc.h"
+#include "cbus.h"
#include "coio_task.h"
#include "diag.h"
#include "errcode.h"
#include "errinj.h"
#include "fiber.h"
#include "iproto_constants.h" /* IPROTO_INSERT */
+#include "journal.h"
#include "key_def.h"
#include "latch.h"
#include "replication.h" /* INSTANCE_UUID */
#include "salad/stailq.h"
#include "say.h"
#include "tt_static.h"
-#include "wal.h"
#include "vclock.h"
#include "xlog.h"
#include "xrow.h"
@@ -178,6 +179,14 @@ struct vy_log {
* only relevant if @tx_failed is set.
*/
struct diag tx_diag;
+ /** Vylog file. */
+ struct xlog xlog;
+ /** Vylog IO thread. */
+ struct cord cord;
+ /** Pipe to the vylog thread. */
+ struct cpipe pipe;
+ /** Returning pipe from the vylog thread back to tx. */
+ struct cpipe tx_pipe;
};
static struct vy_log vy_log;
@@ -189,6 +198,9 @@ vy_recovery_process_record(struct vy_recovery *recovery,
const struct vy_log_record *record);
static int
+vy_log_open(void);
+
+static int
vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery);
int
@@ -731,6 +743,26 @@ err:
return NULL;
}
+/** Vylog thread main loop. */
+static int
+vy_log_thread_f(va_list ap)
+{
+ (void)ap;
+
+ struct cbus_endpoint endpoint;
+ cbus_endpoint_create(&endpoint, "vylog", fiber_schedule_cb, fiber());
+
+ cpipe_create(&vy_log.tx_pipe, "tx");
+
+ cbus_loop(&endpoint);
+
+ if (xlog_is_open(&vy_log.xlog))
+ xlog_close(&vy_log.xlog, false);
+
+ cpipe_destroy(&vy_log.tx_pipe);
+ return 0;
+}
+
void
vy_log_init(const char *dir)
{
@@ -740,7 +772,45 @@ vy_log_init(const char *dir)
region_create(&vy_log.pool, cord_slab_cache());
stailq_create(&vy_log.tx);
diag_create(&vy_log.tx_diag);
- wal_init_vy_log();
+ xlog_clear(&vy_log.xlog);
+
+ if (cord_costart(&vy_log.cord, "vinyl.log", vy_log_thread_f, NULL) != 0)
+ panic_syserror("failed to start vinyl log thread");
+
+ cpipe_create(&vy_log.pipe, "vylog");
+}
+
+struct vy_log_flush_msg {
+ struct cbus_call_msg base;
+ struct journal_entry *entry;
+};
+
+static int
+vy_log_flush_f(struct cbus_call_msg *base)
+{
+ struct vy_log_flush_msg *msg = (struct vy_log_flush_msg *)base;
+ struct journal_entry *entry = msg->entry;
+ struct xlog *xlog = &vy_log.xlog;
+
+ if (!xlog_is_open(xlog)) {
+ if (vy_log_open() < 0)
+ return -1;
+ }
+
+ xlog_tx_begin(xlog);
+ for (int i = 0; i < entry->n_rows; ++i) {
+ entry->rows[i]->tm = ev_now(loop());
+ if (xlog_write_row(xlog, entry->rows[i]) < 0) {
+ xlog_tx_rollback(xlog);
+ return -1;
+ }
+ }
+
+ if (xlog_tx_commit(xlog) < 0 ||
+ xlog_flush(xlog) < 0)
+ return -1;
+
+ return 0;
}
/**
@@ -798,10 +868,16 @@ vy_log_flush(void)
assert(i == tx_size);
/*
- * Do actual disk writes on behalf of the WAL
+ * Do actual disk writes on behalf of the vylog thread
* so as not to block the tx thread.
*/
- if (wal_write_vy_log(entry) != 0)
+ struct vy_log_flush_msg msg;
+ msg.entry = entry;
+ bool cancellable = fiber_set_cancellable(false);
+ int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base,
+ vy_log_flush_f, NULL, TIMEOUT_INFINITY);
+ fiber_set_cancellable(cancellable);
+ if (rc != 0)
goto err;
/* Success. Free flushed records. */
@@ -817,14 +893,21 @@ err:
void
vy_log_free(void)
{
+ cbus_stop_loop(&vy_log.pipe);
+ if (cord_join(&vy_log.cord) != 0)
+ panic_syserror("failed to join vinyl log thread");
xdir_destroy(&vy_log.dir);
region_destroy(&vy_log.pool);
diag_destroy(&vy_log.tx_diag);
}
-int
-vy_log_open(struct xlog *xlog)
+/**
+ * Open current vy_log file.
+ */
+static int
+vy_log_open(void)
{
+ struct xlog *xlog = &vy_log.xlog;
/*
* Open the current log file or create a new one
* if it doesn't exist.
@@ -1022,6 +1105,15 @@ vy_log_rotate_f(va_list ap)
return vy_log_create(vclock, recovery);
}
+static int
+vy_log_close_f(struct cbus_call_msg *msg)
+{
+ (void)msg;
+ if (xlog_is_open(&vy_log.xlog))
+ xlog_close(&vy_log.xlog, false);
+ return 0;
+}
+
int
vy_log_rotate(const struct vclock *vclock)
{
@@ -1069,9 +1161,14 @@ vy_log_rotate(const struct vclock *vclock)
/*
* Success. Close the old log. The new one will be opened
- * automatically on the first write (see wal_write_vy_log()).
+ * automatically on the first write (see vy_log_flush_f()).
*/
- wal_rotate_vy_log();
+ struct cbus_call_msg msg;
+ bool cancellable = fiber_set_cancellable(false);
+ cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg,
+ vy_log_close_f, NULL, TIMEOUT_INFINITY);
+ fiber_set_cancellable(cancellable);
+
vclock_copy(&vy_log.last_checkpoint, vclock);
/* Add the new vclock to the xdir so that we can track it. */
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index ee38c193..81f62706 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -439,12 +439,6 @@ void
vy_log_free(void);
/**
- * Open current vy_log file.
- */
-int
-vy_log_open(struct xlog *xlog);
-
-/**
* Rotate the metadata log. This function creates a new
* xlog file in the log directory having vclock @vclock
* and writes records required to recover active LSM trees.
diff --git a/src/box/wal.c b/src/box/wal.c
index 25bceb68..1f93e162 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -39,7 +39,6 @@
#include "xlog.h"
#include "xrow.h"
-#include "vy_log.h"
#include "cbus.h"
#include "coio_task.h"
#include "replication.h"
@@ -173,15 +172,6 @@ struct wal_msg {
struct vclock vclock;
};
-/**
- * Vinyl metadata log writer.
- */
-struct vy_log_writer {
- /** The metadata log file. */
- struct xlog xlog;
-};
-
-static struct vy_log_writer vy_log_writer;
static struct wal_writer wal_writer_singleton;
enum wal_mode
@@ -1115,9 +1105,6 @@ wal_writer_f(va_list ap)
if (xlog_is_open(&writer->current_wal))
xlog_close(&writer->current_wal, false);
- if (xlog_is_open(&vy_log_writer.xlog))
- xlog_close(&vy_log_writer.xlog, false);
-
cpipe_destroy(&writer->tx_prio_pipe);
return 0;
}
@@ -1198,72 +1185,6 @@ wal_write_in_wal_mode_none(struct journal *journal,
return vclock_sum(&writer->vclock);
}
-void
-wal_init_vy_log()
-{
- xlog_clear(&vy_log_writer.xlog);
-}
-
-struct wal_write_vy_log_msg
-{
- struct cbus_call_msg base;
- struct journal_entry *entry;
-};
-
-static int
-wal_write_vy_log_f(struct cbus_call_msg *msg)
-{
- struct journal_entry *entry =
- ((struct wal_write_vy_log_msg *)msg)->entry;
-
- if (! xlog_is_open(&vy_log_writer.xlog)) {
- if (vy_log_open(&vy_log_writer.xlog) < 0)
- return -1;
- }
-
- if (xlog_write_entry(&vy_log_writer.xlog, entry) < 0)
- return -1;
-
- if (xlog_flush(&vy_log_writer.xlog) < 0)
- return -1;
-
- return 0;
-}
-
-int
-wal_write_vy_log(struct journal_entry *entry)
-{
- struct wal_writer *writer = &wal_writer_singleton;
- struct wal_write_vy_log_msg msg;
- msg.entry= entry;
- bool cancellable = fiber_set_cancellable(false);
- int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
- &msg.base, wal_write_vy_log_f, NULL,
- TIMEOUT_INFINITY);
- fiber_set_cancellable(cancellable);
- return rc;
-}
-
-static int
-wal_rotate_vy_log_f(struct cbus_call_msg *msg)
-{
- (void) msg;
- if (xlog_is_open(&vy_log_writer.xlog))
- xlog_close(&vy_log_writer.xlog, false);
- return 0;
-}
-
-void
-wal_rotate_vy_log()
-{
- struct wal_writer *writer = &wal_writer_singleton;
- struct cbus_call_msg msg;
- bool cancellable = fiber_set_cancellable(false);
- cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg,
- wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY);
- fiber_set_cancellable(cancellable);
-}
-
static void
wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
{
diff --git a/src/box/wal.h b/src/box/wal.h
index a88a3f23..8448f6d4 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -222,21 +222,6 @@ wal_set_checkpoint_threshold(int64_t threshold);
void
wal_collect_garbage(const struct vclock *vclock);
-void
-wal_init_vy_log();
-
-/**
- * Write xrows to the vinyl metadata log.
- */
-int
-wal_write_vy_log(struct journal_entry *req);
-
-/**
- * Rotate the vinyl metadata log.
- */
-void
-wal_rotate_vy_log();
-
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
--
2.11.0
More information about the Tarantool-patches
mailing list