[tarantool-patches] Re: [PATCH 02/10] vinyl: add a separate thread for vylog

Konstantin Osipov kostja at tarantool.org
Sat May 18 21:39:58 MSK 2019


* Vladimir Davydov <vdavydov.dev at gmail.com> [19/05/17 17:54]:

We need an infrastructure to run an opaque piece of code in any
OS thread, not create a new thread any time we have an itch for
refactoring.

If you wish to run vylog in an own thread, fine, WAL thread has
plenty of capacity to host 10 vylog threads, please use it.

> Historically, we use the WAL thread for writing vylog files, because,
> I guess, we didn't want to introduce a separate thread. However, that
> design decision turned out to be quite dubious:
> 
>  - vy_log (vy_log.c) calls vy_log_writer (wal.c) while vy_log_writer
>    calls back to vy_log. That is we have a piece of logic split crudely
>    between two files, which makes the code difficult to follow and just
>    looks ugly.
> 
>  - We can't make vy_log part of vy_env because of this relationship.
>    In fact, vy_log is the last singleton in the whole vy implementation:
>    everything else is wrapped neatly (well, not quite, but still) in
>    vy_env struct.
> 
>  - We can't kill the infamous vy_log.latch, which is a prerequisite for
>    transactional DDL. The latch is needed to sync between vy_log readers
>    and writers. You see, currently we can't read vy_log in the same
>    thread where we write it, which would eliminate the need for any kind
>    of synchronization, because vy_log read is quite a heavy operation -
>    it may stall WAL writes and thus badly affect latency. So we have to
>    do it from a coio thread.
> 
> That being said, it's time to move vy_log writer back to where it
> belongs - vy_log.c, separate thread. This patch does just that. It
> doesn't patch the implementation to fix the flaws enumerated above -
> this will be done by following patches.
> ---
>  src/box/vy_log.c | 113 +++++++++++++++++++++++++++++++++++++++++++++++++++----
>  src/box/vy_log.h |   6 ---
>  src/box/wal.c    |  79 --------------------------------------
>  src/box/wal.h    |  15 --------
>  4 files changed, 105 insertions(+), 108 deletions(-)
> 
> diff --git a/src/box/vy_log.c b/src/box/vy_log.c
> index edd61b33..25ab73fd 100644
> --- a/src/box/vy_log.c
> +++ b/src/box/vy_log.c
> @@ -46,19 +46,20 @@
>  #include <small/rlist.h>
>  
>  #include "assoc.h"
> +#include "cbus.h"
>  #include "coio_task.h"
>  #include "diag.h"
>  #include "errcode.h"
>  #include "errinj.h"
>  #include "fiber.h"
>  #include "iproto_constants.h" /* IPROTO_INSERT */
> +#include "journal.h"
>  #include "key_def.h"
>  #include "latch.h"
>  #include "replication.h" /* INSTANCE_UUID */
>  #include "salad/stailq.h"
>  #include "say.h"
>  #include "tt_static.h"
> -#include "wal.h"
>  #include "vclock.h"
>  #include "xlog.h"
>  #include "xrow.h"
> @@ -178,6 +179,14 @@ struct vy_log {
>  	 * only relevant if @tx_failed is set.
>  	 */
>  	struct diag tx_diag;
> +	/** Vylog file. */
> +	struct xlog xlog;
> +	/** Vylog IO thread. */
> +	struct cord cord;
> +	/** Pipe to the vylog thread. */
> +	struct cpipe pipe;
> +	/** Returning pipe from the vylog thread back to tx. */
> +	struct cpipe tx_pipe;
>  };
>  static struct vy_log vy_log;
>  
> @@ -189,6 +198,9 @@ vy_recovery_process_record(struct vy_recovery *recovery,
>  			   const struct vy_log_record *record);
>  
>  static int
> +vy_log_open(void);
> +
> +static int
>  vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery);
>  
>  int
> @@ -731,6 +743,26 @@ err:
>  	return NULL;
>  }
>  
> +/** Vylog thread main loop. */
> +static int
> +vy_log_thread_f(va_list ap)
> +{
> +	(void)ap;
> +
> +	struct cbus_endpoint endpoint;
> +	cbus_endpoint_create(&endpoint, "vylog", fiber_schedule_cb, fiber());
> +
> +	cpipe_create(&vy_log.tx_pipe, "tx");
> +
> +	cbus_loop(&endpoint);
> +
> +	if (xlog_is_open(&vy_log.xlog))
> +		xlog_close(&vy_log.xlog, false);
> +
> +	cpipe_destroy(&vy_log.tx_pipe);
> +	return 0;
> +}
> +
>  void
>  vy_log_init(const char *dir)
>  {
> @@ -740,7 +772,45 @@ vy_log_init(const char *dir)
>  	region_create(&vy_log.pool, cord_slab_cache());
>  	stailq_create(&vy_log.tx);
>  	diag_create(&vy_log.tx_diag);
> -	wal_init_vy_log();
> +	xlog_clear(&vy_log.xlog);
> +
> +	if (cord_costart(&vy_log.cord, "vinyl.log", vy_log_thread_f, NULL) != 0)
> +		panic_syserror("failed to start vinyl log thread");
> +
> +	cpipe_create(&vy_log.pipe, "vylog");
> +}
> +
> +struct vy_log_flush_msg {
> +	struct cbus_call_msg base;
> +	struct journal_entry *entry;
> +};
> +
> +static int
> +vy_log_flush_f(struct cbus_call_msg *base)
> +{
> +	struct vy_log_flush_msg *msg = (struct vy_log_flush_msg *)base;
> +	struct journal_entry *entry = msg->entry;
> +	struct xlog *xlog = &vy_log.xlog;
> +
> +	if (!xlog_is_open(xlog)) {
> +		if (vy_log_open() < 0)
> +			return -1;
> +	}
> +
> +	xlog_tx_begin(xlog);
> +	for (int i = 0; i < entry->n_rows; ++i) {
> +		entry->rows[i]->tm = ev_now(loop());
> +		if (xlog_write_row(xlog, entry->rows[i]) < 0) {
> +			xlog_tx_rollback(xlog);
> +			return -1;
> +		}
> +	}
> +
> +	if (xlog_tx_commit(xlog) < 0 ||
> +	    xlog_flush(xlog) < 0)
> +		return -1;
> +
> +	return 0;
>  }
>  
>  /**
> @@ -798,10 +868,16 @@ vy_log_flush(void)
>  	assert(i == tx_size);
>  
>  	/*
> -	 * Do actual disk writes on behalf of the WAL
> +	 * Do actual disk writes on behalf of the vylog thread
>  	 * so as not to block the tx thread.
>  	 */
> -	if (wal_write_vy_log(entry) != 0)
> +	struct vy_log_flush_msg msg;
> +	msg.entry = entry;
> +	bool cancellable = fiber_set_cancellable(false);
> +	int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base,
> +			   vy_log_flush_f, NULL, TIMEOUT_INFINITY);
> +	fiber_set_cancellable(cancellable);
> +	if (rc != 0)
>  		goto err;
>  
>  	/* Success. Free flushed records. */
> @@ -817,14 +893,21 @@ err:
>  void
>  vy_log_free(void)
>  {
> +	cbus_stop_loop(&vy_log.pipe);
> +	if (cord_join(&vy_log.cord) != 0)
> +		panic_syserror("failed to join vinyl log thread");
>  	xdir_destroy(&vy_log.dir);
>  	region_destroy(&vy_log.pool);
>  	diag_destroy(&vy_log.tx_diag);
>  }
>  
> -int
> -vy_log_open(struct xlog *xlog)
> +/**
> + * Open current vy_log file.
> + */
> +static int
> +vy_log_open(void)
>  {
> +	struct xlog *xlog = &vy_log.xlog;
>  	/*
>  	 * Open the current log file or create a new one
>  	 * if it doesn't exist.
> @@ -1022,6 +1105,15 @@ vy_log_rotate_f(va_list ap)
>  	return vy_log_create(vclock, recovery);
>  }
>  
> +static int
> +vy_log_close_f(struct cbus_call_msg *msg)
> +{
> +	(void)msg;
> +	if (xlog_is_open(&vy_log.xlog))
> +		xlog_close(&vy_log.xlog, false);
> +	return 0;
> +}
> +
>  int
>  vy_log_rotate(const struct vclock *vclock)
>  {
> @@ -1069,9 +1161,14 @@ vy_log_rotate(const struct vclock *vclock)
>  
>  	/*
>  	 * Success. Close the old log. The new one will be opened
> -	 * automatically on the first write (see wal_write_vy_log()).
> +	 * automatically on the first write (see vy_log_flush_f()).
>  	 */
> -	wal_rotate_vy_log();
> +	struct cbus_call_msg msg;
> +	bool cancellable = fiber_set_cancellable(false);
> +	cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg,
> +		  vy_log_close_f, NULL, TIMEOUT_INFINITY);
> +	fiber_set_cancellable(cancellable);
> +
>  	vclock_copy(&vy_log.last_checkpoint, vclock);
>  
>  	/* Add the new vclock to the xdir so that we can track it. */
> diff --git a/src/box/vy_log.h b/src/box/vy_log.h
> index ee38c193..81f62706 100644
> --- a/src/box/vy_log.h
> +++ b/src/box/vy_log.h
> @@ -439,12 +439,6 @@ void
>  vy_log_free(void);
>  
>  /**
> - * Open current vy_log file.
> - */
> -int
> -vy_log_open(struct xlog *xlog);
> -
> -/**
>   * Rotate the metadata log. This function creates a new
>   * xlog file in the log directory having vclock @vclock
>   * and writes records required to recover active LSM trees.
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 25bceb68..1f93e162 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -39,7 +39,6 @@
>  
>  #include "xlog.h"
>  #include "xrow.h"
> -#include "vy_log.h"
>  #include "cbus.h"
>  #include "coio_task.h"
>  #include "replication.h"
> @@ -173,15 +172,6 @@ struct wal_msg {
>  	struct vclock vclock;
>  };
>  
> -/**
> - * Vinyl metadata log writer.
> - */
> -struct vy_log_writer {
> -	/** The metadata log file. */
> -	struct xlog xlog;
> -};
> -
> -static struct vy_log_writer vy_log_writer;
>  static struct wal_writer wal_writer_singleton;
>  
>  enum wal_mode
> @@ -1115,9 +1105,6 @@ wal_writer_f(va_list ap)
>  	if (xlog_is_open(&writer->current_wal))
>  		xlog_close(&writer->current_wal, false);
>  
> -	if (xlog_is_open(&vy_log_writer.xlog))
> -		xlog_close(&vy_log_writer.xlog, false);
> -
>  	cpipe_destroy(&writer->tx_prio_pipe);
>  	return 0;
>  }
> @@ -1198,72 +1185,6 @@ wal_write_in_wal_mode_none(struct journal *journal,
>  	return vclock_sum(&writer->vclock);
>  }
>  
> -void
> -wal_init_vy_log()
> -{
> -	xlog_clear(&vy_log_writer.xlog);
> -}
> -
> -struct wal_write_vy_log_msg
> -{
> -	struct cbus_call_msg base;
> -	struct journal_entry *entry;
> -};
> -
> -static int
> -wal_write_vy_log_f(struct cbus_call_msg *msg)
> -{
> -	struct journal_entry *entry =
> -		((struct wal_write_vy_log_msg *)msg)->entry;
> -
> -	if (! xlog_is_open(&vy_log_writer.xlog)) {
> -		if (vy_log_open(&vy_log_writer.xlog) < 0)
> -			return -1;
> -	}
> -
> -	if (xlog_write_entry(&vy_log_writer.xlog, entry) < 0)
> -		return -1;
> -
> -	if (xlog_flush(&vy_log_writer.xlog) < 0)
> -		return -1;
> -
> -	return 0;
> -}
> -
> -int
> -wal_write_vy_log(struct journal_entry *entry)
> -{
> -	struct wal_writer *writer = &wal_writer_singleton;
> -	struct wal_write_vy_log_msg msg;
> -	msg.entry= entry;
> -	bool cancellable = fiber_set_cancellable(false);
> -	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
> -			   &msg.base, wal_write_vy_log_f, NULL,
> -			   TIMEOUT_INFINITY);
> -	fiber_set_cancellable(cancellable);
> -	return rc;
> -}
> -
> -static int
> -wal_rotate_vy_log_f(struct cbus_call_msg *msg)
> -{
> -	(void) msg;
> -	if (xlog_is_open(&vy_log_writer.xlog))
> -		xlog_close(&vy_log_writer.xlog, false);
> -	return 0;
> -}
> -
> -void
> -wal_rotate_vy_log()
> -{
> -	struct wal_writer *writer = &wal_writer_singleton;
> -	struct cbus_call_msg msg;
> -	bool cancellable = fiber_set_cancellable(false);
> -	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg,
> -		  wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY);
> -	fiber_set_cancellable(cancellable);
> -}
> -
>  static void
>  wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
>  {
> diff --git a/src/box/wal.h b/src/box/wal.h
> index a88a3f23..8448f6d4 100644
> --- a/src/box/wal.h
> +++ b/src/box/wal.h
> @@ -222,21 +222,6 @@ wal_set_checkpoint_threshold(int64_t threshold);
>  void
>  wal_collect_garbage(const struct vclock *vclock);
>  
> -void
> -wal_init_vy_log();
> -
> -/**
> - * Write xrows to the vinyl metadata log.
> - */
> -int
> -wal_write_vy_log(struct journal_entry *req);
> -
> -/**
> - * Rotate the vinyl metadata log.
> - */
> -void
> -wal_rotate_vy_log();
> -
>  #if defined(__cplusplus)
>  } /* extern "C" */
>  #endif /* defined(__cplusplus) */
> -- 
> 2.11.0
> 

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32




More information about the Tarantool-patches mailing list