[tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit

Vladimir Davydov vdavydov.dev at gmail.com
Thu Jun 20 18:00:00 MSK 2019


On Thu, Jun 20, 2019 at 12:23:14AM +0300, Georgy Kirichenko wrote:
> This commit implements asynchronous transaction processing using
> txn_write. The method prepares a transaction and sends it to an journal
> without an yield until the transaction was finished. The transaction
> status could be controlled via on_commit/on_rollback triggers.
> In order to support asynchronous transaction journal_write method turned
> to an asynchronous one and now a transaction engine controls journal status
> using journal entry finalization callback.
> 
> Prerequisites: #1254
> ---
>  src/box/journal.c |   2 -
>  src/box/journal.h |  10 +---
>  src/box/txn.c     | 137 ++++++++++++++++++++++++++++++----------------
>  src/box/txn.h     |  16 ++++++
>  src/box/wal.c     |  23 +++-----
>  5 files changed, 116 insertions(+), 72 deletions(-)
> 
> diff --git a/src/box/journal.c b/src/box/journal.c
> index eb0db9af2..b4f3515f0 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -30,7 +30,6 @@
>   */
>  #include "journal.h"
>  #include <small/region.h>
> -#include <fiber.h>
>  #include <diag.h>
>  
>  /**
> @@ -73,7 +72,6 @@ journal_entry_new(size_t n_rows, struct region *region,
>  	entry->approx_len = 0;
>  	entry->n_rows = n_rows;
>  	entry->res = -1;
> -	entry->fiber = fiber();
>  	entry->on_done_cb = on_done_cb;
>  	entry->on_done_cb_data = on_done_cb_data;
>  	return entry;
> diff --git a/src/box/journal.h b/src/box/journal.h
> index b704b5c67..e85ff2c9e 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -33,6 +33,7 @@
>  #include <stdint.h>
>  #include <stdbool.h>
>  #include "salad/stailq.h"
> +#include "fiber.h"
>  
>  #if defined(__cplusplus)
>  extern "C" {
> @@ -54,10 +55,6 @@ struct journal_entry {
>  	 * the committed transaction, on error is -1
>  	 */
>  	int64_t res;
> -	/**
> -	 * The fiber issuing the request.
> -	 */
> -	struct fiber *fiber;

I'd move fiber_wakeup to the callback in the same patch I introduce the
callback, but I guess it doesn't really matter.

>  	/**
>  	 * A journal entry completion callback.
>  	 */
> @@ -110,10 +107,9 @@ struct journal {
>  extern struct journal *current_journal;
>  
>  /**
> - * Record a single entry.
> + * Send a single entry to write.
>   *
> - * @return   a log sequence number (vclock signature) of the entry
> - *           or -1 on error.
> + * @return 0 if write was scheduled or -1 in case of an error.
>   */
>  static inline int64_t
>  journal_write(struct journal_entry *entry)

This belongs to the previous patch.

> diff --git a/src/box/txn.c b/src/box/txn.c
> index 52e16f3e6..493bc2e3c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -199,6 +199,9 @@ txn_begin()
>  	txn->engine = NULL;
>  	txn->engine_tx = NULL;
>  	txn->psql_txn = NULL;
> +	txn->entry = NULL;
> +	txn->fiber = NULL;
> +	txn->done = false;
>  	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
>  	fiber_set_txn(fiber(), txn);
>  	trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
> @@ -359,7 +362,11 @@ txn_complete(struct txn *txn)
>  			panic("rollback trigger failed");
>  		}
>  		fiber_set_txn(fiber(), NULL);
> -
> +		txn->done = true;
> +		if (txn->fiber == NULL)
> +			txn_free(txn);
> +		else if (txn->fiber != fiber())
> +			fiber_wakeup(txn->fiber);

Some comments would be really appreciated, something like:

		if (txn->fiber != NULL) {
			/*
			 * Wake up the initiating fiber - it will free
			 * the transaction memory. Note, this function
			 * may be called by the initiating fiber itself,
			 * e.g. for wal_mode=none mode, in which case
			 * we don't need to call fiber_wakeup().
			 */
			if (txn->fiber != fiber())
				fiber_wakeup(txn->fiber);
		} else {
			/*
			 * This is either rollback, in which case we
			 * must free memory, or an async transaction
			 * completion. In the latter case the fiber that
			 * initiated the transaction is long gone so
			 * it's our responsibility to clean up.
			 */
			 txn_free(txn);
		}

>  		return;
>  	}
>  	/*
> @@ -368,6 +375,15 @@ txn_complete(struct txn *txn)
>  	 */
>  	if (txn->engine != NULL)
>  		engine_commit(txn->engine, txn);
> +
> +	ev_tstamp stop_tm = ev_monotonic_now(loop());
> +	if (stop_tm - txn->start_tm > too_long_threshold) {
> +		int n_rows = txn->n_new_rows + txn->n_applier_rows;
> +		say_warn_ratelimited("too long WAL write: %d rows at "
> +				     "LSN %lld: %.3f sec", n_rows,
> +				     txn->signature - n_rows + 1,
> +				     stop_tm - txn->start_tm);
> +	}
>  	/*
>  	 * Some of triggers require for in_txn variable is set so
>  	 * restore it for time a trigger is in progress.
> @@ -378,6 +394,7 @@ txn_complete(struct txn *txn)
>  	 * may throw. In case an error has happened, there is
>  	 * no other option but terminate.
>  	 */
> +	fiber_set_txn(fiber(), txn);

It's already set, just a few lines above. Added by the previous patch
AFAIR.

>  	if (txn->has_triggers &&
>  	    trigger_run(&txn->on_commit, txn) != 0) {
>  		diag_log();
> @@ -386,35 +403,41 @@ txn_complete(struct txn *txn)
>  	}
>  
>  	fiber_set_txn(fiber(), NULL);
> +	txn->done = true;
> +	if (txn->fiber == NULL)
> +		txn_free(txn);
> +	else if (txn->fiber != fiber())
> +		fiber_wakeup(txn->fiber);

Please re-factor your code to avoid copy-and-paste.
I assume you could move it to txn_entry_done_cb.

>  }
>  
>  static void
>  txn_entry_done_cb(struct journal_entry *entry, void *data)
>  {
>  	struct txn *txn = (struct txn *)data;
> +	assert(txn->entry == entry);
>  	txn->signature = entry->res;
>  	txn_complete(txn);
>  }
> @@ -467,32 +481,63 @@ txn_commit(struct txn *txn)
>  	 * we have a bunch of IPROTO_NOP statements.
>  	 */
>  	if (txn->engine != NULL) {
> -		if (engine_prepare(txn->engine, txn) != 0)
> -			goto fail;
> +		if (engine_prepare(txn->engine, txn) != 0) {
> +			return -1;
> +		}
>  	}
>  	trigger_clear(&txn->fiber_on_stop);
> +	return 0;
> +}
>  
> -	fiber_set_txn(fiber(), NULL);
> -	if (txn->n_new_rows + txn->n_applier_rows > 0) {
> -		txn->signature = txn_write_to_wal(txn);
> -		if (txn->signature < 0)
> -			return -1;
> -	} else {
> -		/*
> -		 * However there is noting to write to wal a completion
> -		 * should be fired.
> -		 */
> +/*
> + * Send a transaction to a journal.
> + */
> +int
> +txn_write(struct txn *txn)

I'd call it txn_commit_async, becase it's just like txn_commit, but
asynchronous.

> +{
> +	if (txn_prepare(txn) != 0)
> +		goto fail;
> +
> +	txn->start_tm = ev_monotonic_now(loop());
> +	if (txn->n_new_rows + txn->n_applier_rows == 0) {
> +		/* Nothing to do. */
>  		txn->signature = 0;
>  		txn_complete(txn);
> +		return 0;
>  	}
>  
> -	txn_free(txn);
> +	if (txn_journal_write(txn) != 0)
> +		return -1;
> +	fiber_set_txn(fiber(), NULL);
>  	return 0;
>  fail:
>  	txn_rollback(txn);
>  	return -1;
>  }
>  
> +int
> +txn_commit(struct txn *txn)
> +{
> +	txn->fiber = fiber();
> +
> +	if (txn_write(txn) != 0)
> +		return -1;
> +	/*
> +	 * In case of non-yielding journal the transaction could already
> +	 * be done and there is nothing to wait in such cases.
> +	 */
> +	if (!txn->done) {
> +		bool cancellable = fiber_set_cancellable(false);
> +		fiber_yield();
> +		fiber_set_cancellable(cancellable);
> +	}
> +	int res = txn->signature >= 0? 0: -1;
> +	if (res != 0)
> +		diag_set(ClientError, ER_WAL_IO);
> +	txn_free(txn);
> +	return res;
> +}
> +
>  void
>  txn_rollback_stmt(struct txn *txn)
>  {
> @@ -505,11 +550,9 @@ txn_rollback_stmt(struct txn *txn)
>  void
>  txn_rollback(struct txn *txn)
>  {
> -	assert(txn == in_txn());
>  	trigger_clear(&txn->fiber_on_stop);
>  	txn->signature = -1;
>  	txn_complete(txn);
> -	txn_free(txn);
>  }
>  
>  void
> diff --git a/src/box/txn.h b/src/box/txn.h
> index 569978ce9..bd6b695a9 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -195,6 +195,16 @@ struct txn {
>  	 /** Commit and rollback triggers */
>  	struct rlist on_commit, on_rollback;
>  	struct sql_txn *psql_txn;
> +	/** Journal entry to control txn write. */
> +	struct journal_entry *entry;

Why do you need to add txn->entry? Why not allocate it on txn->region?
You just need it to stay alive until the transaction is freed. You don't
need to store a pointer to it in txn AFAICS.

> +	/** Transaction completion trigger. */
> +	struct trigger entry_done;

This trigger is never used.

> +	/** Timestampt of entry write start. */
> +	ev_tstamp start_tm;
> +	/* A fiber to wake up when transaction is finished. */
> +	struct fiber *fiber;

It can be NULL. Please explain in what cases.

> +	/* True when transaction is processed. */
> +	bool done;
>  };
>  
>  /* Pointer to the current transaction (if any) */
> @@ -228,6 +238,12 @@ txn_commit(struct txn *txn);
>  void
>  txn_rollback(struct txn *txn);
>  
> +int
> +txn_write(struct txn *txn);
> +
> +int
> +txn_wait(struct txn *txn);
> +

txn_wait isn't defined

>  /**
>   * Roll back the transaction but keep the object around.
>   * A special case for memtx transaction abort on yield. In this
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 62b6391fd..582ae4598 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -265,7 +265,6 @@ tx_schedule_f(va_list ap)
>  						   struct journal_entry, fifo);
>  			if (req->on_done_cb != NULL)
>  				req->on_done_cb(req, req->on_done_cb_data);
> -			fiber_wakeup(req->fiber);
>  		}
>  		writer->is_in_rollback = false;
>  		fiber_cond_wait(&writer->schedule_cond);
> @@ -274,7 +273,7 @@ tx_schedule_f(va_list ap)
>  }
>  
>  /**
> - * Attach requests to a scheduling queue.
> + * Signal done condition.
>   */
>  static void
>  tx_schedule_queue(struct stailq *queue)
> @@ -380,7 +379,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
>  	writer->wal_max_size = wal_max_size;
>  	writer->is_in_rollback = false;
>  	journal_create(&writer->base, wal_mode == WAL_NONE ?
> -		       wal_write_in_wal_mode_none : wal_write, NULL);
> +		       wal_write_in_wal_mode_none : wal_write,
> +		       NULL);

Why change this?



More information about the Tarantool-patches mailing list