[PATCH 02/10] vinyl: add a separate thread for vylog

Vladimir Davydov vdavydov.dev at gmail.com
Fri May 17 17:52:36 MSK 2019


Historically, we use the WAL thread for writing vylog files, because,
I guess, we didn't want to introduce a separate thread. However, that
design decision turned out to be quite dubious:

 - vy_log (vy_log.c) calls vy_log_writer (wal.c) while vy_log_writer
   calls back to vy_log. That is we have a piece of logic split crudely
   between two files, which makes the code difficult to follow and just
   looks ugly.

 - We can't make vy_log part of vy_env because of this relationship.
   In fact, vy_log is the last singleton in the whole vy implementation:
   everything else is wrapped neatly (well, not quite, but still) in
   vy_env struct.

 - We can't kill the infamous vy_log.latch, which is a prerequisite for
   transactional DDL. The latch is needed to sync between vy_log readers
   and writers. You see, currently we can't read vy_log in the same
   thread where we write it, which would eliminate the need for any kind
   of synchronization, because vy_log read is quite a heavy operation -
   it may stall WAL writes and thus badly affect latency. So we have to
   do it from a coio thread.

That being said, it's time to move vy_log writer back to where it
belongs - vy_log.c, separate thread. This patch does just that. It
doesn't patch the implementation to fix the flaws enumerated above -
this will be done by following patches.
---
 src/box/vy_log.c | 113 +++++++++++++++++++++++++++++++++++++++++++++++++++----
 src/box/vy_log.h |   6 ---
 src/box/wal.c    |  79 --------------------------------------
 src/box/wal.h    |  15 --------
 4 files changed, 105 insertions(+), 108 deletions(-)

diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index edd61b33..25ab73fd 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -46,19 +46,20 @@
 #include <small/rlist.h>
 
 #include "assoc.h"
+#include "cbus.h"
 #include "coio_task.h"
 #include "diag.h"
 #include "errcode.h"
 #include "errinj.h"
 #include "fiber.h"
 #include "iproto_constants.h" /* IPROTO_INSERT */
+#include "journal.h"
 #include "key_def.h"
 #include "latch.h"
 #include "replication.h" /* INSTANCE_UUID */
 #include "salad/stailq.h"
 #include "say.h"
 #include "tt_static.h"
-#include "wal.h"
 #include "vclock.h"
 #include "xlog.h"
 #include "xrow.h"
@@ -178,6 +179,14 @@ struct vy_log {
 	 * only relevant if @tx_failed is set.
 	 */
 	struct diag tx_diag;
+	/** Vylog file. */
+	struct xlog xlog;
+	/** Vylog IO thread. */
+	struct cord cord;
+	/** Pipe to the vylog thread. */
+	struct cpipe pipe;
+	/** Returning pipe from the vylog thread back to tx. */
+	struct cpipe tx_pipe;
 };
 static struct vy_log vy_log;
 
@@ -189,6 +198,9 @@ vy_recovery_process_record(struct vy_recovery *recovery,
 			   const struct vy_log_record *record);
 
 static int
+vy_log_open(void);
+
+static int
 vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery);
 
 int
@@ -731,6 +743,26 @@ err:
 	return NULL;
 }
 
+/** Vylog thread main loop. */
+static int
+vy_log_thread_f(va_list ap)
+{
+	(void)ap;
+
+	struct cbus_endpoint endpoint;
+	cbus_endpoint_create(&endpoint, "vylog", fiber_schedule_cb, fiber());
+
+	cpipe_create(&vy_log.tx_pipe, "tx");
+
+	cbus_loop(&endpoint);
+
+	if (xlog_is_open(&vy_log.xlog))
+		xlog_close(&vy_log.xlog, false);
+
+	cpipe_destroy(&vy_log.tx_pipe);
+	return 0;
+}
+
 void
 vy_log_init(const char *dir)
 {
@@ -740,7 +772,45 @@ vy_log_init(const char *dir)
 	region_create(&vy_log.pool, cord_slab_cache());
 	stailq_create(&vy_log.tx);
 	diag_create(&vy_log.tx_diag);
-	wal_init_vy_log();
+	xlog_clear(&vy_log.xlog);
+
+	if (cord_costart(&vy_log.cord, "vinyl.log", vy_log_thread_f, NULL) != 0)
+		panic_syserror("failed to start vinyl log thread");
+
+	cpipe_create(&vy_log.pipe, "vylog");
+}
+
+struct vy_log_flush_msg {
+	struct cbus_call_msg base;
+	struct journal_entry *entry;
+};
+
+static int
+vy_log_flush_f(struct cbus_call_msg *base)
+{
+	struct vy_log_flush_msg *msg = (struct vy_log_flush_msg *)base;
+	struct journal_entry *entry = msg->entry;
+	struct xlog *xlog = &vy_log.xlog;
+
+	if (!xlog_is_open(xlog)) {
+		if (vy_log_open() < 0)
+			return -1;
+	}
+
+	xlog_tx_begin(xlog);
+	for (int i = 0; i < entry->n_rows; ++i) {
+		entry->rows[i]->tm = ev_now(loop());
+		if (xlog_write_row(xlog, entry->rows[i]) < 0) {
+			xlog_tx_rollback(xlog);
+			return -1;
+		}
+	}
+
+	if (xlog_tx_commit(xlog) < 0 ||
+	    xlog_flush(xlog) < 0)
+		return -1;
+
+	return 0;
 }
 
 /**
@@ -798,10 +868,16 @@ vy_log_flush(void)
 	assert(i == tx_size);
 
 	/*
-	 * Do actual disk writes on behalf of the WAL
+	 * Do actual disk writes on behalf of the vylog thread
 	 * so as not to block the tx thread.
 	 */
-	if (wal_write_vy_log(entry) != 0)
+	struct vy_log_flush_msg msg;
+	msg.entry = entry;
+	bool cancellable = fiber_set_cancellable(false);
+	int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base,
+			   vy_log_flush_f, NULL, TIMEOUT_INFINITY);
+	fiber_set_cancellable(cancellable);
+	if (rc != 0)
 		goto err;
 
 	/* Success. Free flushed records. */
@@ -817,14 +893,21 @@ err:
 void
 vy_log_free(void)
 {
+	cbus_stop_loop(&vy_log.pipe);
+	if (cord_join(&vy_log.cord) != 0)
+		panic_syserror("failed to join vinyl log thread");
 	xdir_destroy(&vy_log.dir);
 	region_destroy(&vy_log.pool);
 	diag_destroy(&vy_log.tx_diag);
 }
 
-int
-vy_log_open(struct xlog *xlog)
+/**
+ * Open current vy_log file.
+ */
+static int
+vy_log_open(void)
 {
+	struct xlog *xlog = &vy_log.xlog;
 	/*
 	 * Open the current log file or create a new one
 	 * if it doesn't exist.
@@ -1022,6 +1105,15 @@ vy_log_rotate_f(va_list ap)
 	return vy_log_create(vclock, recovery);
 }
 
+static int
+vy_log_close_f(struct cbus_call_msg *msg)
+{
+	(void)msg;
+	if (xlog_is_open(&vy_log.xlog))
+		xlog_close(&vy_log.xlog, false);
+	return 0;
+}
+
 int
 vy_log_rotate(const struct vclock *vclock)
 {
@@ -1069,9 +1161,14 @@ vy_log_rotate(const struct vclock *vclock)
 
 	/*
 	 * Success. Close the old log. The new one will be opened
-	 * automatically on the first write (see wal_write_vy_log()).
+	 * automatically on the first write (see vy_log_flush_f()).
 	 */
-	wal_rotate_vy_log();
+	struct cbus_call_msg msg;
+	bool cancellable = fiber_set_cancellable(false);
+	cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg,
+		  vy_log_close_f, NULL, TIMEOUT_INFINITY);
+	fiber_set_cancellable(cancellable);
+
 	vclock_copy(&vy_log.last_checkpoint, vclock);
 
 	/* Add the new vclock to the xdir so that we can track it. */
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index ee38c193..81f62706 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -439,12 +439,6 @@ void
 vy_log_free(void);
 
 /**
- * Open current vy_log file.
- */
-int
-vy_log_open(struct xlog *xlog);
-
-/**
  * Rotate the metadata log. This function creates a new
  * xlog file in the log directory having vclock @vclock
  * and writes records required to recover active LSM trees.
diff --git a/src/box/wal.c b/src/box/wal.c
index 25bceb68..1f93e162 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -39,7 +39,6 @@
 
 #include "xlog.h"
 #include "xrow.h"
-#include "vy_log.h"
 #include "cbus.h"
 #include "coio_task.h"
 #include "replication.h"
@@ -173,15 +172,6 @@ struct wal_msg {
 	struct vclock vclock;
 };
 
-/**
- * Vinyl metadata log writer.
- */
-struct vy_log_writer {
-	/** The metadata log file. */
-	struct xlog xlog;
-};
-
-static struct vy_log_writer vy_log_writer;
 static struct wal_writer wal_writer_singleton;
 
 enum wal_mode
@@ -1115,9 +1105,6 @@ wal_writer_f(va_list ap)
 	if (xlog_is_open(&writer->current_wal))
 		xlog_close(&writer->current_wal, false);
 
-	if (xlog_is_open(&vy_log_writer.xlog))
-		xlog_close(&vy_log_writer.xlog, false);
-
 	cpipe_destroy(&writer->tx_prio_pipe);
 	return 0;
 }
@@ -1198,72 +1185,6 @@ wal_write_in_wal_mode_none(struct journal *journal,
 	return vclock_sum(&writer->vclock);
 }
 
-void
-wal_init_vy_log()
-{
-	xlog_clear(&vy_log_writer.xlog);
-}
-
-struct wal_write_vy_log_msg
-{
-	struct cbus_call_msg base;
-	struct journal_entry *entry;
-};
-
-static int
-wal_write_vy_log_f(struct cbus_call_msg *msg)
-{
-	struct journal_entry *entry =
-		((struct wal_write_vy_log_msg *)msg)->entry;
-
-	if (! xlog_is_open(&vy_log_writer.xlog)) {
-		if (vy_log_open(&vy_log_writer.xlog) < 0)
-			return -1;
-	}
-
-	if (xlog_write_entry(&vy_log_writer.xlog, entry) < 0)
-		return -1;
-
-	if (xlog_flush(&vy_log_writer.xlog) < 0)
-		return -1;
-
-	return 0;
-}
-
-int
-wal_write_vy_log(struct journal_entry *entry)
-{
-	struct wal_writer *writer = &wal_writer_singleton;
-	struct wal_write_vy_log_msg msg;
-	msg.entry= entry;
-	bool cancellable = fiber_set_cancellable(false);
-	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
-			   &msg.base, wal_write_vy_log_f, NULL,
-			   TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
-	return rc;
-}
-
-static int
-wal_rotate_vy_log_f(struct cbus_call_msg *msg)
-{
-	(void) msg;
-	if (xlog_is_open(&vy_log_writer.xlog))
-		xlog_close(&vy_log_writer.xlog, false);
-	return 0;
-}
-
-void
-wal_rotate_vy_log()
-{
-	struct wal_writer *writer = &wal_writer_singleton;
-	struct cbus_call_msg msg;
-	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg,
-		  wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
-}
-
 static void
 wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 {
diff --git a/src/box/wal.h b/src/box/wal.h
index a88a3f23..8448f6d4 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -222,21 +222,6 @@ wal_set_checkpoint_threshold(int64_t threshold);
 void
 wal_collect_garbage(const struct vclock *vclock);
 
-void
-wal_init_vy_log();
-
-/**
- * Write xrows to the vinyl metadata log.
- */
-int
-wal_write_vy_log(struct journal_entry *req);
-
-/**
- * Rotate the vinyl metadata log.
- */
-void
-wal_rotate_vy_log();
-
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
-- 
2.11.0




More information about the Tarantool-patches mailing list