[tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit
Vladimir Davydov
vdavydov.dev at gmail.com
Thu Jun 20 18:00:00 MSK 2019
On Thu, Jun 20, 2019 at 12:23:14AM +0300, Georgy Kirichenko wrote:
> This commit implements asynchronous transaction processing using
> txn_write. The method prepares a transaction and sends it to an journal
> without an yield until the transaction was finished. The transaction
> status could be controlled via on_commit/on_rollback triggers.
> In order to support asynchronous transaction journal_write method turned
> to an asynchronous one and now a transaction engine controls journal status
> using journal entry finalization callback.
>
> Prerequisites: #1254
> ---
> src/box/journal.c | 2 -
> src/box/journal.h | 10 +---
> src/box/txn.c | 137 ++++++++++++++++++++++++++++++----------------
> src/box/txn.h | 16 ++++++
> src/box/wal.c | 23 +++-----
> 5 files changed, 116 insertions(+), 72 deletions(-)
>
> diff --git a/src/box/journal.c b/src/box/journal.c
> index eb0db9af2..b4f3515f0 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -30,7 +30,6 @@
> */
> #include "journal.h"
> #include <small/region.h>
> -#include <fiber.h>
> #include <diag.h>
>
> /**
> @@ -73,7 +72,6 @@ journal_entry_new(size_t n_rows, struct region *region,
> entry->approx_len = 0;
> entry->n_rows = n_rows;
> entry->res = -1;
> - entry->fiber = fiber();
> entry->on_done_cb = on_done_cb;
> entry->on_done_cb_data = on_done_cb_data;
> return entry;
> diff --git a/src/box/journal.h b/src/box/journal.h
> index b704b5c67..e85ff2c9e 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -33,6 +33,7 @@
> #include <stdint.h>
> #include <stdbool.h>
> #include "salad/stailq.h"
> +#include "fiber.h"
>
> #if defined(__cplusplus)
> extern "C" {
> @@ -54,10 +55,6 @@ struct journal_entry {
> * the committed transaction, on error is -1
> */
> int64_t res;
> - /**
> - * The fiber issuing the request.
> - */
> - struct fiber *fiber;
I'd move fiber_wakeup to the callback in the same patch I introduce the
callback, but I guess it doesn't really matter.
> /**
> * A journal entry completion callback.
> */
> @@ -110,10 +107,9 @@ struct journal {
> extern struct journal *current_journal;
>
> /**
> - * Record a single entry.
> + * Send a single entry to write.
> *
> - * @return a log sequence number (vclock signature) of the entry
> - * or -1 on error.
> + * @return 0 if write was scheduled or -1 in case of an error.
> */
> static inline int64_t
> journal_write(struct journal_entry *entry)
This belongs to the previous patch.
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 52e16f3e6..493bc2e3c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -199,6 +199,9 @@ txn_begin()
> txn->engine = NULL;
> txn->engine_tx = NULL;
> txn->psql_txn = NULL;
> + txn->entry = NULL;
> + txn->fiber = NULL;
> + txn->done = false;
> /* fiber_on_yield/fiber_on_stop initialized by engine on demand */
> fiber_set_txn(fiber(), txn);
> trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
> @@ -359,7 +362,11 @@ txn_complete(struct txn *txn)
> panic("rollback trigger failed");
> }
> fiber_set_txn(fiber(), NULL);
> -
> + txn->done = true;
> + if (txn->fiber == NULL)
> + txn_free(txn);
> + else if (txn->fiber != fiber())
> + fiber_wakeup(txn->fiber);
Some comments would be really appreciated, something like:
if (txn->fiber != NULL) {
/*
* Wake up the initiating fiber - it will free
* the transaction memory. Note, this function
* may be called by the initiating fiber itself,
* e.g. for wal_mode=none mode, in which case
* we don't need to call fiber_wakeup().
*/
if (txn->fiber != fiber())
fiber_wakeup(txn->fiber);
} else {
/*
* This is either rollback, in which case we
* must free memory, or an async transaction
* completion. In the latter case the fiber that
* initiated the transaction is long gone so
* it's our responsibility to clean up.
*/
txn_free(txn);
}
> return;
> }
> /*
> @@ -368,6 +375,15 @@ txn_complete(struct txn *txn)
> */
> if (txn->engine != NULL)
> engine_commit(txn->engine, txn);
> +
> + ev_tstamp stop_tm = ev_monotonic_now(loop());
> + if (stop_tm - txn->start_tm > too_long_threshold) {
> + int n_rows = txn->n_new_rows + txn->n_applier_rows;
> + say_warn_ratelimited("too long WAL write: %d rows at "
> + "LSN %lld: %.3f sec", n_rows,
> + txn->signature - n_rows + 1,
> + stop_tm - txn->start_tm);
> + }
> /*
> * Some of triggers require for in_txn variable is set so
> * restore it for time a trigger is in progress.
> @@ -378,6 +394,7 @@ txn_complete(struct txn *txn)
> * may throw. In case an error has happened, there is
> * no other option but terminate.
> */
> + fiber_set_txn(fiber(), txn);
It's already set, just a few lines above. Added by the previous patch
AFAIR.
> if (txn->has_triggers &&
> trigger_run(&txn->on_commit, txn) != 0) {
> diag_log();
> @@ -386,35 +403,41 @@ txn_complete(struct txn *txn)
> }
>
> fiber_set_txn(fiber(), NULL);
> + txn->done = true;
> + if (txn->fiber == NULL)
> + txn_free(txn);
> + else if (txn->fiber != fiber())
> + fiber_wakeup(txn->fiber);
Please re-factor your code to avoid copy-and-paste.
I assume you could move it to txn_entry_done_cb.
> }
>
> static void
> txn_entry_done_cb(struct journal_entry *entry, void *data)
> {
> struct txn *txn = (struct txn *)data;
> + assert(txn->entry == entry);
> txn->signature = entry->res;
> txn_complete(txn);
> }
> @@ -467,32 +481,63 @@ txn_commit(struct txn *txn)
> * we have a bunch of IPROTO_NOP statements.
> */
> if (txn->engine != NULL) {
> - if (engine_prepare(txn->engine, txn) != 0)
> - goto fail;
> + if (engine_prepare(txn->engine, txn) != 0) {
> + return -1;
> + }
> }
> trigger_clear(&txn->fiber_on_stop);
> + return 0;
> +}
>
> - fiber_set_txn(fiber(), NULL);
> - if (txn->n_new_rows + txn->n_applier_rows > 0) {
> - txn->signature = txn_write_to_wal(txn);
> - if (txn->signature < 0)
> - return -1;
> - } else {
> - /*
> - * However there is noting to write to wal a completion
> - * should be fired.
> - */
> +/*
> + * Send a transaction to a journal.
> + */
> +int
> +txn_write(struct txn *txn)
I'd call it txn_commit_async, becase it's just like txn_commit, but
asynchronous.
> +{
> + if (txn_prepare(txn) != 0)
> + goto fail;
> +
> + txn->start_tm = ev_monotonic_now(loop());
> + if (txn->n_new_rows + txn->n_applier_rows == 0) {
> + /* Nothing to do. */
> txn->signature = 0;
> txn_complete(txn);
> + return 0;
> }
>
> - txn_free(txn);
> + if (txn_journal_write(txn) != 0)
> + return -1;
> + fiber_set_txn(fiber(), NULL);
> return 0;
> fail:
> txn_rollback(txn);
> return -1;
> }
>
> +int
> +txn_commit(struct txn *txn)
> +{
> + txn->fiber = fiber();
> +
> + if (txn_write(txn) != 0)
> + return -1;
> + /*
> + * In case of non-yielding journal the transaction could already
> + * be done and there is nothing to wait in such cases.
> + */
> + if (!txn->done) {
> + bool cancellable = fiber_set_cancellable(false);
> + fiber_yield();
> + fiber_set_cancellable(cancellable);
> + }
> + int res = txn->signature >= 0? 0: -1;
> + if (res != 0)
> + diag_set(ClientError, ER_WAL_IO);
> + txn_free(txn);
> + return res;
> +}
> +
> void
> txn_rollback_stmt(struct txn *txn)
> {
> @@ -505,11 +550,9 @@ txn_rollback_stmt(struct txn *txn)
> void
> txn_rollback(struct txn *txn)
> {
> - assert(txn == in_txn());
> trigger_clear(&txn->fiber_on_stop);
> txn->signature = -1;
> txn_complete(txn);
> - txn_free(txn);
> }
>
> void
> diff --git a/src/box/txn.h b/src/box/txn.h
> index 569978ce9..bd6b695a9 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -195,6 +195,16 @@ struct txn {
> /** Commit and rollback triggers */
> struct rlist on_commit, on_rollback;
> struct sql_txn *psql_txn;
> + /** Journal entry to control txn write. */
> + struct journal_entry *entry;
Why do you need to add txn->entry? Why not allocate it on txn->region?
You just need it to stay alive until the transaction is freed. You don't
need to store a pointer to it in txn AFAICS.
> + /** Transaction completion trigger. */
> + struct trigger entry_done;
This trigger is never used.
> + /** Timestampt of entry write start. */
> + ev_tstamp start_tm;
> + /* A fiber to wake up when transaction is finished. */
> + struct fiber *fiber;
It can be NULL. Please explain in what cases.
> + /* True when transaction is processed. */
> + bool done;
> };
>
> /* Pointer to the current transaction (if any) */
> @@ -228,6 +238,12 @@ txn_commit(struct txn *txn);
> void
> txn_rollback(struct txn *txn);
>
> +int
> +txn_write(struct txn *txn);
> +
> +int
> +txn_wait(struct txn *txn);
> +
txn_wait isn't defined
> /**
> * Roll back the transaction but keep the object around.
> * A special case for memtx transaction abort on yield. In this
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 62b6391fd..582ae4598 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -265,7 +265,6 @@ tx_schedule_f(va_list ap)
> struct journal_entry, fifo);
> if (req->on_done_cb != NULL)
> req->on_done_cb(req, req->on_done_cb_data);
> - fiber_wakeup(req->fiber);
> }
> writer->is_in_rollback = false;
> fiber_cond_wait(&writer->schedule_cond);
> @@ -274,7 +273,7 @@ tx_schedule_f(va_list ap)
> }
>
> /**
> - * Attach requests to a scheduling queue.
> + * Signal done condition.
> */
> static void
> tx_schedule_queue(struct stailq *queue)
> @@ -380,7 +379,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
> writer->wal_max_size = wal_max_size;
> writer->is_in_rollback = false;
> journal_create(&writer->base, wal_mode == WAL_NONE ?
> - wal_write_in_wal_mode_none : wal_write, NULL);
> + wal_write_in_wal_mode_none : wal_write,
> + NULL);
Why change this?
More information about the Tarantool-patches
mailing list