From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Thu, 20 Jun 2019 18:00:00 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit Message-ID: <20190620150000.kmtxnrua3iucntbf@esperanza> References: MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: On Thu, Jun 20, 2019 at 12:23:14AM +0300, Georgy Kirichenko wrote: > This commit implements asynchronous transaction processing using > txn_write. The method prepares a transaction and sends it to an journal > without an yield until the transaction was finished. The transaction > status could be controlled via on_commit/on_rollback triggers. > In order to support asynchronous transaction journal_write method turned > to an asynchronous one and now a transaction engine controls journal status > using journal entry finalization callback. > > Prerequisites: #1254 > --- > src/box/journal.c | 2 - > src/box/journal.h | 10 +--- > src/box/txn.c | 137 ++++++++++++++++++++++++++++++---------------- > src/box/txn.h | 16 ++++++ > src/box/wal.c | 23 +++----- > 5 files changed, 116 insertions(+), 72 deletions(-) > > diff --git a/src/box/journal.c b/src/box/journal.c > index eb0db9af2..b4f3515f0 100644 > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -30,7 +30,6 @@ > */ > #include "journal.h" > #include > -#include > #include > > /** > @@ -73,7 +72,6 @@ journal_entry_new(size_t n_rows, struct region *region, > entry->approx_len = 0; > entry->n_rows = n_rows; > entry->res = -1; > - entry->fiber = fiber(); > entry->on_done_cb = on_done_cb; > entry->on_done_cb_data = on_done_cb_data; > return entry; > diff --git a/src/box/journal.h b/src/box/journal.h > index b704b5c67..e85ff2c9e 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -33,6 +33,7 @@ > #include > #include > #include "salad/stailq.h" > +#include "fiber.h" > > #if defined(__cplusplus) > extern "C" { > @@ -54,10 +55,6 @@ struct journal_entry { > * the committed transaction, on error is -1 > */ > int64_t res; > - /** > - * The fiber issuing the request. > - */ > - struct fiber *fiber; I'd move fiber_wakeup to the callback in the same patch I introduce the callback, but I guess it doesn't really matter. > /** > * A journal entry completion callback. > */ > @@ -110,10 +107,9 @@ struct journal { > extern struct journal *current_journal; > > /** > - * Record a single entry. > + * Send a single entry to write. > * > - * @return a log sequence number (vclock signature) of the entry > - * or -1 on error. > + * @return 0 if write was scheduled or -1 in case of an error. > */ > static inline int64_t > journal_write(struct journal_entry *entry) This belongs to the previous patch. > diff --git a/src/box/txn.c b/src/box/txn.c > index 52e16f3e6..493bc2e3c 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -199,6 +199,9 @@ txn_begin() > txn->engine = NULL; > txn->engine_tx = NULL; > txn->psql_txn = NULL; > + txn->entry = NULL; > + txn->fiber = NULL; > + txn->done = false; > /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ > fiber_set_txn(fiber(), txn); > trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL); > @@ -359,7 +362,11 @@ txn_complete(struct txn *txn) > panic("rollback trigger failed"); > } > fiber_set_txn(fiber(), NULL); > - > + txn->done = true; > + if (txn->fiber == NULL) > + txn_free(txn); > + else if (txn->fiber != fiber()) > + fiber_wakeup(txn->fiber); Some comments would be really appreciated, something like: if (txn->fiber != NULL) { /* * Wake up the initiating fiber - it will free * the transaction memory. Note, this function * may be called by the initiating fiber itself, * e.g. for wal_mode=none mode, in which case * we don't need to call fiber_wakeup(). */ if (txn->fiber != fiber()) fiber_wakeup(txn->fiber); } else { /* * This is either rollback, in which case we * must free memory, or an async transaction * completion. In the latter case the fiber that * initiated the transaction is long gone so * it's our responsibility to clean up. */ txn_free(txn); } > return; > } > /* > @@ -368,6 +375,15 @@ txn_complete(struct txn *txn) > */ > if (txn->engine != NULL) > engine_commit(txn->engine, txn); > + > + ev_tstamp stop_tm = ev_monotonic_now(loop()); > + if (stop_tm - txn->start_tm > too_long_threshold) { > + int n_rows = txn->n_new_rows + txn->n_applier_rows; > + say_warn_ratelimited("too long WAL write: %d rows at " > + "LSN %lld: %.3f sec", n_rows, > + txn->signature - n_rows + 1, > + stop_tm - txn->start_tm); > + } > /* > * Some of triggers require for in_txn variable is set so > * restore it for time a trigger is in progress. > @@ -378,6 +394,7 @@ txn_complete(struct txn *txn) > * may throw. In case an error has happened, there is > * no other option but terminate. > */ > + fiber_set_txn(fiber(), txn); It's already set, just a few lines above. Added by the previous patch AFAIR. > if (txn->has_triggers && > trigger_run(&txn->on_commit, txn) != 0) { > diag_log(); > @@ -386,35 +403,41 @@ txn_complete(struct txn *txn) > } > > fiber_set_txn(fiber(), NULL); > + txn->done = true; > + if (txn->fiber == NULL) > + txn_free(txn); > + else if (txn->fiber != fiber()) > + fiber_wakeup(txn->fiber); Please re-factor your code to avoid copy-and-paste. I assume you could move it to txn_entry_done_cb. > } > > static void > txn_entry_done_cb(struct journal_entry *entry, void *data) > { > struct txn *txn = (struct txn *)data; > + assert(txn->entry == entry); > txn->signature = entry->res; > txn_complete(txn); > } > @@ -467,32 +481,63 @@ txn_commit(struct txn *txn) > * we have a bunch of IPROTO_NOP statements. > */ > if (txn->engine != NULL) { > - if (engine_prepare(txn->engine, txn) != 0) > - goto fail; > + if (engine_prepare(txn->engine, txn) != 0) { > + return -1; > + } > } > trigger_clear(&txn->fiber_on_stop); > + return 0; > +} > > - fiber_set_txn(fiber(), NULL); > - if (txn->n_new_rows + txn->n_applier_rows > 0) { > - txn->signature = txn_write_to_wal(txn); > - if (txn->signature < 0) > - return -1; > - } else { > - /* > - * However there is noting to write to wal a completion > - * should be fired. > - */ > +/* > + * Send a transaction to a journal. > + */ > +int > +txn_write(struct txn *txn) I'd call it txn_commit_async, becase it's just like txn_commit, but asynchronous. > +{ > + if (txn_prepare(txn) != 0) > + goto fail; > + > + txn->start_tm = ev_monotonic_now(loop()); > + if (txn->n_new_rows + txn->n_applier_rows == 0) { > + /* Nothing to do. */ > txn->signature = 0; > txn_complete(txn); > + return 0; > } > > - txn_free(txn); > + if (txn_journal_write(txn) != 0) > + return -1; > + fiber_set_txn(fiber(), NULL); > return 0; > fail: > txn_rollback(txn); > return -1; > } > > +int > +txn_commit(struct txn *txn) > +{ > + txn->fiber = fiber(); > + > + if (txn_write(txn) != 0) > + return -1; > + /* > + * In case of non-yielding journal the transaction could already > + * be done and there is nothing to wait in such cases. > + */ > + if (!txn->done) { > + bool cancellable = fiber_set_cancellable(false); > + fiber_yield(); > + fiber_set_cancellable(cancellable); > + } > + int res = txn->signature >= 0? 0: -1; > + if (res != 0) > + diag_set(ClientError, ER_WAL_IO); > + txn_free(txn); > + return res; > +} > + > void > txn_rollback_stmt(struct txn *txn) > { > @@ -505,11 +550,9 @@ txn_rollback_stmt(struct txn *txn) > void > txn_rollback(struct txn *txn) > { > - assert(txn == in_txn()); > trigger_clear(&txn->fiber_on_stop); > txn->signature = -1; > txn_complete(txn); > - txn_free(txn); > } > > void > diff --git a/src/box/txn.h b/src/box/txn.h > index 569978ce9..bd6b695a9 100644 > --- a/src/box/txn.h > +++ b/src/box/txn.h > @@ -195,6 +195,16 @@ struct txn { > /** Commit and rollback triggers */ > struct rlist on_commit, on_rollback; > struct sql_txn *psql_txn; > + /** Journal entry to control txn write. */ > + struct journal_entry *entry; Why do you need to add txn->entry? Why not allocate it on txn->region? You just need it to stay alive until the transaction is freed. You don't need to store a pointer to it in txn AFAICS. > + /** Transaction completion trigger. */ > + struct trigger entry_done; This trigger is never used. > + /** Timestampt of entry write start. */ > + ev_tstamp start_tm; > + /* A fiber to wake up when transaction is finished. */ > + struct fiber *fiber; It can be NULL. Please explain in what cases. > + /* True when transaction is processed. */ > + bool done; > }; > > /* Pointer to the current transaction (if any) */ > @@ -228,6 +238,12 @@ txn_commit(struct txn *txn); > void > txn_rollback(struct txn *txn); > > +int > +txn_write(struct txn *txn); > + > +int > +txn_wait(struct txn *txn); > + txn_wait isn't defined > /** > * Roll back the transaction but keep the object around. > * A special case for memtx transaction abort on yield. In this > diff --git a/src/box/wal.c b/src/box/wal.c > index 62b6391fd..582ae4598 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -265,7 +265,6 @@ tx_schedule_f(va_list ap) > struct journal_entry, fifo); > if (req->on_done_cb != NULL) > req->on_done_cb(req, req->on_done_cb_data); > - fiber_wakeup(req->fiber); > } > writer->is_in_rollback = false; > fiber_cond_wait(&writer->schedule_cond); > @@ -274,7 +273,7 @@ tx_schedule_f(va_list ap) > } > > /** > - * Attach requests to a scheduling queue. > + * Signal done condition. > */ > static void > tx_schedule_queue(struct stailq *queue) > @@ -380,7 +379,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, > writer->wal_max_size = wal_max_size; > writer->is_in_rollback = false; > journal_create(&writer->base, wal_mode == WAL_NONE ? > - wal_write_in_wal_mode_none : wal_write, NULL); > + wal_write_in_wal_mode_none : wal_write, > + NULL); Why change this?