From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Thu, 20 Jun 2019 17:08:48 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback Message-ID: <20190620140848.77crsufqebxhf4q2@esperanza> References: <079a143aa17e185c44c5fe9c53ef207d7988fe56.1560978655.git.georgy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <079a143aa17e185c44c5fe9c53ef207d7988fe56.1560978655.git.georgy@tarantool.org> To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: On Thu, Jun 20, 2019 at 12:23:13AM +0300, Georgy Kirichenko wrote: > Finalize a transaction thorough a journal entry callback. So transaction > processing doesn't rely on fiber schedule. This also enforce transaction > finalization order as triggers might fail. > > Prerequisites: #1254 > --- > src/box/alter.cc | 5 +++ > src/box/box.cc | 7 ++- > src/box/journal.c | 10 ++++- > src/box/journal.h | 12 ++++- > src/box/txn.c | 112 ++++++++++++++++++++++++++++++---------------- > src/box/vy_log.c | 3 +- > src/box/wal.c | 20 ++++++++- > 7 files changed, 122 insertions(+), 47 deletions(-) In general, looks fine to me. A few minor comments below. > > diff --git a/src/box/alter.cc b/src/box/alter.cc > index a37a68ce4..aa6a79264 100644 > --- a/src/box/alter.cc > +++ b/src/box/alter.cc > @@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void *event) > { > (void) trigger; > (void) event; > + /* > + * A trigger could be processed by the wal scheduler fiber > + * so steal the latch first. > + */ Not "could be", but "is processed", I guess. > + latch_steal(&schema_lock); > latch_unlock(&schema_lock); > } > > diff --git a/src/box/box.cc b/src/box/box.cc > index 5e5cd2b08..2994363ab 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -309,10 +309,13 @@ struct recovery_journal { > */ > static int64_t > recovery_journal_write(struct journal *base, > - struct journal_entry * /* entry */) > + struct journal_entry *entry) > { > struct recovery_journal *journal = (struct recovery_journal *) base; > - return vclock_sum(journal->vclock); > + entry->res = vclock_sum(journal->vclock); > + if (entry->on_done_cb) > + entry->on_done_cb(entry, entry->on_done_cb_data); This can't be - on_done_cb must always be set. Note, although vy_log uses journal_entry, they don't go through this path. > + return entry->res; Shouldn't it return 0? BTW it looks like you need to update the comment to journal_write(). > } > > static inline void > diff --git a/src/box/journal.c b/src/box/journal.c > index fe13fb6ee..eb0db9af2 100644 > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -41,7 +41,9 @@ static int64_t > dummy_journal_write(struct journal *journal, struct journal_entry *entry) > { > (void) journal; > - (void) entry; > + entry->res = 0; > + if (entry->on_done_cb) > + entry->on_done_cb(entry, entry->on_done_cb_data); > return 0; > } > > @@ -53,7 +55,9 @@ static struct journal dummy_journal = { > struct journal *current_journal = &dummy_journal; > > struct journal_entry * > -journal_entry_new(size_t n_rows, struct region *region) > +journal_entry_new(size_t n_rows, struct region *region, > + void (*on_done_cb)(struct journal_entry *entry, void *data), > + void *on_done_cb_data) > { > struct journal_entry *entry; > > @@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region) > entry->n_rows = n_rows; > entry->res = -1; > entry->fiber = fiber(); > + entry->on_done_cb = on_done_cb; > + entry->on_done_cb_data = on_done_cb_data; > return entry; > } > > diff --git a/src/box/journal.h b/src/box/journal.h > index 8ac32ee5e..b704b5c67 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -58,6 +58,14 @@ struct journal_entry { > * The fiber issuing the request. > */ > struct fiber *fiber; > + /** > + * A journal entry completion callback. > + */ I think we should elaborate the comment. When is this callback called? Is it called on failure? Is it called from the same fiber or some other? > + void (*on_done_cb)(struct journal_entry *entry, void *data); Let's please add a typedef for on_done_cb - you use its signature pretty often. > + /** > + * A journal entry completion callback argument. > + */ > + void *on_done_cb_data; > /** > * Approximate size of this request when encoded. > */ > @@ -80,7 +88,9 @@ struct region; > * @return NULL if out of memory, fiber diagnostics area is set > */ > struct journal_entry * > -journal_entry_new(size_t n_rows, struct region *region); > +journal_entry_new(size_t n_rows, struct region *region, > + void (*on_done_cb)(struct journal_entry *entry, void *data), > + void *on_done_cb_data); > > /** > * An API for an abstract journal for all transactions of this > diff --git a/src/box/txn.c b/src/box/txn.c > index 21f7e98b4..52e16f3e6 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -337,6 +337,66 @@ fail: > return -1; > } > > +/** > + * Complete transaction processing. > + */ > +static void > +txn_complete(struct txn *txn) > +{ > + if (txn->signature < 0) { > + if (txn->engine) > + engine_rollback(txn->engine, txn); > + /* > + * Some of triggers require for in_txn variable is set so > + * restore it for time a trigger is in progress. > + */ > + fiber_set_txn(fiber(), txn); > + /* Rollback triggers must not throw. */ > + if (txn->has_triggers && > + trigger_run(&txn->on_rollback, txn) != 0) { > + diag_log(); > + unreachable(); > + panic("rollback trigger failed"); > + } > + fiber_set_txn(fiber(), NULL); > + > + return; > + } > + /* > + * Engine can be NULL if transaction contains IPROTO_NOP > + * statements only. > + */ > + if (txn->engine != NULL) > + engine_commit(txn->engine, txn); > + /* > + * Some of triggers require for in_txn variable is set so > + * restore it for time a trigger is in progress. > + */ > + fiber_set_txn(fiber(), txn); copy-and-paste... May be, add a helper function for running triggers? > + /* > + * The transaction is in the binary log. No action below > + * may throw. In case an error has happened, there is > + * no other option but terminate. > + */ > + if (txn->has_triggers && > + trigger_run(&txn->on_commit, txn) != 0) { > + diag_log(); > + unreachable(); > + panic("commit trigger failed"); > + } > + > + fiber_set_txn(fiber(), NULL); > +} > + > +static void > +txn_entry_done_cb(struct journal_entry *entry, void *data) > +{ > + struct txn *txn = (struct txn *)data; > + txn->signature = entry->res; > + txn_complete(txn); > +} > + > + > static int64_t > txn_write_to_wal(struct txn *txn) > { > @@ -418,31 +472,20 @@ txn_commit(struct txn *txn) > } > trigger_clear(&txn->fiber_on_stop); > > + fiber_set_txn(fiber(), NULL); > if (txn->n_new_rows + txn->n_applier_rows > 0) { > txn->signature = txn_write_to_wal(txn); > if (txn->signature < 0) > return -1; > + } else { > + /* > + * However there is noting to write to wal a completion s/noting/nothing punctuation marks missing > + * should be fired. > + */ > + txn->signature = 0; > + txn_complete(txn); > } > - /* > - * Engine can be NULL if transaction contains IPROTO_NOP > - * statements only. > - */ > - if (txn->engine != NULL) > - engine_commit(txn->engine, txn); > - /* > - * The transaction is in the binary log. No action below > - * may throw. In case an error has happened, there is > - * no other option but terminate. > - */ > - if (txn->has_triggers && > - trigger_run(&txn->on_commit, txn) != 0) { > - diag_log(); > - unreachable(); > - panic("commit trigger failed"); > - } > - > > - fiber_set_txn(fiber(), NULL); > txn_free(txn); > return 0; > fail: > diff --git a/src/box/wal.c b/src/box/wal.c > index 71f6dbb5c..62b6391fd 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -263,6 +263,8 @@ tx_schedule_f(va_list ap) > struct journal_entry *req = > stailq_shift_entry(&writer->schedule_queue, > struct journal_entry, fifo); > + if (req->on_done_cb != NULL) > + req->on_done_cb(req, req->on_done_cb_data); > fiber_wakeup(req->fiber); > } > writer->is_in_rollback = false; > @@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct journal_entry *entry) > { > struct wal_writer *writer = (struct wal_writer *) journal; > > - ERROR_INJECT_RETURN(ERRINJ_WAL_IO); > + ERROR_INJECT(ERRINJ_WAL_IO, { > + entry->res = -1; > + if (entry->on_done_cb != NULL) > + entry->on_done_cb(entry, entry->on_done_cb_data); > + return -1; > + }); > > if (writer->is_in_rollback) { > /* > @@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct journal_entry *entry) > say_error("Aborting transaction %llu during " > "cascading rollback", > vclock_sum(&writer->vclock)); > + entry->res = -1; > + if (entry->on_done_cb != NULL) > + entry->on_done_cb(entry, entry->on_done_cb_data); Could you please add 'goto fail' so as not to duplicate code. I would also add a helper function journal_entry_complete() that would call the callback - would look neater that way IMO. > return -1; > } > > @@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct journal_entry *entry) > if (batch == NULL) { > diag_set(OutOfMemory, sizeof(struct wal_msg), > "region", "struct wal_msg"); > + entry->res = -1; > + if (entry->on_done_cb != NULL) > + entry->on_done_cb(entry, entry->on_done_cb_data); > return -1; > } > wal_msg_create(batch); > @@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal, > entry->rows + entry->n_rows); > vclock_merge(&writer->vclock, &vclock_diff); > vclock_copy(&replicaset.vclock, &writer->vclock); > - return vclock_sum(&writer->vclock); > + entry->res = vclock_sum(&writer->vclock); > + if (entry->on_done_cb) > + entry->on_done_cb(entry, entry->on_done_cb_data); > + return entry->res; > }