On Thursday, June 20, 2019 5:08:48 PM MSK Vladimir Davydov wrote: > On Thu, Jun 20, 2019 at 12:23:13AM +0300, Georgy Kirichenko wrote: > > Finalize a transaction thorough a journal entry callback. So transaction > > processing doesn't rely on fiber schedule. This also enforce transaction > > finalization order as triggers might fail. > > > > Prerequisites: #1254 > > --- > > > > src/box/alter.cc | 5 +++ > > src/box/box.cc | 7 ++- > > src/box/journal.c | 10 ++++- > > src/box/journal.h | 12 ++++- > > src/box/txn.c | 112 ++++++++++++++++++++++++++++++---------------- > > src/box/vy_log.c | 3 +- > > src/box/wal.c | 20 ++++++++- > > 7 files changed, 122 insertions(+), 47 deletions(-) > > In general, looks fine to me. A few minor comments below. > > > diff --git a/src/box/alter.cc b/src/box/alter.cc > > index a37a68ce4..aa6a79264 100644 > > --- a/src/box/alter.cc > > +++ b/src/box/alter.cc > > @@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void > > *event)> > > { > > > > (void) trigger; > > (void) event; > > > > + /* > > + * A trigger could be processed by the wal scheduler fiber > > + * so steal the latch first. > > + */ > > Not "could be", but "is processed", I guess. There are two cases: for non-yielding journal (wal none or recovery) it would be the same fiber but for yielding one - this would be a tx_prio callback. > > > + latch_steal(&schema_lock); > > > > latch_unlock(&schema_lock); > > > > } > > > > diff --git a/src/box/box.cc b/src/box/box.cc > > index 5e5cd2b08..2994363ab 100644 > > --- a/src/box/box.cc > > +++ b/src/box/box.cc > > @@ -309,10 +309,13 @@ struct recovery_journal { > > > > */ > > > > static int64_t > > recovery_journal_write(struct journal *base, > > > > - struct journal_entry * /* entry */) > > + struct journal_entry *entry) > > > > { > > > > struct recovery_journal *journal = (struct recovery_journal *) base; > > > > - return vclock_sum(journal->vclock); > > + entry->res = vclock_sum(journal->vclock); > > + if (entry->on_done_cb) > > + entry->on_done_cb(entry, entry->on_done_cb_data); > > This can't be - on_done_cb must always be set. > > Note, although vy_log uses journal_entry, they don't go through this > path. Ok, so I will add an assert instead of the condition. > > > + return entry->res; > > Shouldn't it return 0? BTW it looks like you need to update the comment > to journal_write(). Accepted > > > } > > > > static inline void > > > > diff --git a/src/box/journal.c b/src/box/journal.c > > index fe13fb6ee..eb0db9af2 100644 > > --- a/src/box/journal.c > > +++ b/src/box/journal.c > > @@ -41,7 +41,9 @@ static int64_t > > > > dummy_journal_write(struct journal *journal, struct journal_entry *entry) > > { > > > > (void) journal; > > > > - (void) entry; > > + entry->res = 0; > > + if (entry->on_done_cb) > > + entry->on_done_cb(entry, entry->on_done_cb_data); > > > > return 0; > > > > } > > > > @@ -53,7 +55,9 @@ static struct journal dummy_journal = { > > > > struct journal *current_journal = &dummy_journal; > > > > struct journal_entry * > > > > -journal_entry_new(size_t n_rows, struct region *region) > > +journal_entry_new(size_t n_rows, struct region *region, > > + void (*on_done_cb)(struct journal_entry *entry, void *data), > > + void *on_done_cb_data) > > > > { > > > > struct journal_entry *entry; > > > > @@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region) > > > > entry->n_rows = n_rows; > > entry->res = -1; > > entry->fiber = fiber(); > > > > + entry->on_done_cb = on_done_cb; > > + entry->on_done_cb_data = on_done_cb_data; > > > > return entry; > > > > } > > > > diff --git a/src/box/journal.h b/src/box/journal.h > > index 8ac32ee5e..b704b5c67 100644 > > --- a/src/box/journal.h > > +++ b/src/box/journal.h > > @@ -58,6 +58,14 @@ struct journal_entry { > > > > * The fiber issuing the request. > > */ > > > > struct fiber *fiber; > > > > + /** > > + * A journal entry completion callback. > > + */ > > I think we should elaborate the comment. When is this callback called? > Is it called on failure? Is it called from the same fiber or some other? Ok > > > + void (*on_done_cb)(struct journal_entry *entry, void *data); > > Let's please add a typedef for on_done_cb - you use its signature pretty > often. Ok > > > + /** > > + * A journal entry completion callback argument. > > + */ > > + void *on_done_cb_data; > > > > /** > > > > * Approximate size of this request when encoded. > > */ > > > > @@ -80,7 +88,9 @@ struct region; > > > > * @return NULL if out of memory, fiber diagnostics area is set > > */ > > > > struct journal_entry * > > > > -journal_entry_new(size_t n_rows, struct region *region); > > +journal_entry_new(size_t n_rows, struct region *region, > > + void (*on_done_cb)(struct journal_entry *entry, void *data), > > + void *on_done_cb_data); > > > > /** > > > > * An API for an abstract journal for all transactions of this > > > > diff --git a/src/box/txn.c b/src/box/txn.c > > index 21f7e98b4..52e16f3e6 100644 > > --- a/src/box/txn.c > > +++ b/src/box/txn.c > > > > @@ -337,6 +337,66 @@ fail: > > return -1; > > > > } > > > > +/** > > + * Complete transaction processing. > > + */ > > +static void > > +txn_complete(struct txn *txn) > > +{ > > + if (txn->signature < 0) { > > + if (txn->engine) > > + engine_rollback(txn->engine, txn); > > + /* > > + * Some of triggers require for in_txn variable is set so > > + * restore it for time a trigger is in progress. > > + */ > > + fiber_set_txn(fiber(), txn); > > + /* Rollback triggers must not throw. */ > > + if (txn->has_triggers && > > + trigger_run(&txn->on_rollback, txn) != 0) { > > + diag_log(); > > + unreachable(); > > + panic("rollback trigger failed"); > > + } > > + fiber_set_txn(fiber(), NULL); > > + > > + return; > > + } > > + /* > > + * Engine can be NULL if transaction contains IPROTO_NOP > > + * statements only. > > + */ > > + if (txn->engine != NULL) > > + engine_commit(txn->engine, txn); > > + /* > > + * Some of triggers require for in_txn variable is set so > > + * restore it for time a trigger is in progress. > > + */ > > + fiber_set_txn(fiber(), txn); > > copy-and-paste... May be, add a helper function for running triggers? Accepted > > > + /* > > + * The transaction is in the binary log. No action below > > + * may throw. In case an error has happened, there is > > + * no other option but terminate. > > + */ > > + if (txn->has_triggers && > > + trigger_run(&txn->on_commit, txn) != 0) { > > + diag_log(); > > + unreachable(); > > + panic("commit trigger failed"); > > + } > > + > > + fiber_set_txn(fiber(), NULL); > > +} > > + > > +static void > > +txn_entry_done_cb(struct journal_entry *entry, void *data) > > +{ > > + struct txn *txn = (struct txn *)data; > > + txn->signature = entry->res; > > + txn_complete(txn); > > +} > > + > > + > > > > static int64_t > > txn_write_to_wal(struct txn *txn) > > { > > > > @@ -418,31 +472,20 @@ txn_commit(struct txn *txn) > > > > } > > trigger_clear(&txn->fiber_on_stop); > > > > + fiber_set_txn(fiber(), NULL); > > > > if (txn->n_new_rows + txn->n_applier_rows > 0) { > > > > txn->signature = txn_write_to_wal(txn); > > if (txn->signature < 0) > > > > return -1; > > > > + } else { > > + /* > > + * However there is noting to write to wal a completion > > s/noting/nothing > > punctuation marks missing > > > + * should be fired. > > + */ > > + txn->signature = 0; > > + txn_complete(txn); > > > > } > > > > - /* > > - * Engine can be NULL if transaction contains IPROTO_NOP > > - * statements only. > > - */ > > - if (txn->engine != NULL) > > - engine_commit(txn->engine, txn); > > - /* > > - * The transaction is in the binary log. No action below > > - * may throw. In case an error has happened, there is > > - * no other option but terminate. > > - */ > > - if (txn->has_triggers && > > - trigger_run(&txn->on_commit, txn) != 0) { > > - diag_log(); > > - unreachable(); > > - panic("commit trigger failed"); > > - } > > - > > > > - fiber_set_txn(fiber(), NULL); > > > > txn_free(txn); > > return 0; > > > > fail: > > diff --git a/src/box/wal.c b/src/box/wal.c > > index 71f6dbb5c..62b6391fd 100644 > > --- a/src/box/wal.c > > +++ b/src/box/wal.c > > @@ -263,6 +263,8 @@ tx_schedule_f(va_list ap) > > > > struct journal_entry *req = > > > > stailq_shift_entry(&writer->schedule_queue, > > > > struct journal_entry, fifo); > > > > + if (req->on_done_cb != NULL) > > + req->on_done_cb(req, req->on_done_cb_data); > > > > fiber_wakeup(req->fiber); > > > > } > > writer->is_in_rollback = false; > > > > @@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct > > journal_entry *entry)> > > { > > > > struct wal_writer *writer = (struct wal_writer *) journal; > > > > - ERROR_INJECT_RETURN(ERRINJ_WAL_IO); > > + ERROR_INJECT(ERRINJ_WAL_IO, { > > + entry->res = -1; > > + if (entry->on_done_cb != NULL) > > + entry->on_done_cb(entry, entry->on_done_cb_data); > > + return -1; > > + }); > > > > if (writer->is_in_rollback) { > > > > /* > > > > @@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct > > journal_entry *entry)> > > say_error("Aborting transaction %llu during " > > > > "cascading rollback", > > vclock_sum(&writer->vclock)); > > > > + entry->res = -1; > > + if (entry->on_done_cb != NULL) > > + entry->on_done_cb(entry, entry->on_done_cb_data); > > Could you please add 'goto fail' so as not to duplicate code. > > I would also add a helper function journal_entry_complete() that would > call the callback - would look neater that way IMO. Accepted > > > return -1; > > > > } > > > > @@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct > > journal_entry *entry)> > > if (batch == NULL) { > > > > diag_set(OutOfMemory, sizeof(struct wal_msg), > > > > "region", "struct wal_msg"); > > > > + entry->res = -1; > > + if (entry->on_done_cb != NULL) > > + entry->on_done_cb(entry, entry->on_done_cb_data); > > > > return -1; > > > > } > > wal_msg_create(batch); > > > > @@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal, > > > > entry->rows + entry->n_rows); > > > > vclock_merge(&writer->vclock, &vclock_diff); > > vclock_copy(&replicaset.vclock, &writer->vclock); > > > > - return vclock_sum(&writer->vclock); > > + entry->res = vclock_sum(&writer->vclock); > > + if (entry->on_done_cb) > > + entry->on_done_cb(entry, entry->on_done_cb_data); > > + return entry->res; > > > > }