[tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback
Георгий Кириченко
georgy at tarantool.org
Thu Jun 20 23:22:14 MSK 2019
On Thursday, June 20, 2019 5:08:48 PM MSK Vladimir Davydov wrote:
> On Thu, Jun 20, 2019 at 12:23:13AM +0300, Georgy Kirichenko wrote:
> > Finalize a transaction thorough a journal entry callback. So transaction
> > processing doesn't rely on fiber schedule. This also enforce transaction
> > finalization order as triggers might fail.
> >
> > Prerequisites: #1254
> > ---
> >
> > src/box/alter.cc | 5 +++
> > src/box/box.cc | 7 ++-
> > src/box/journal.c | 10 ++++-
> > src/box/journal.h | 12 ++++-
> > src/box/txn.c | 112 ++++++++++++++++++++++++++++++----------------
> > src/box/vy_log.c | 3 +-
> > src/box/wal.c | 20 ++++++++-
> > 7 files changed, 122 insertions(+), 47 deletions(-)
>
> In general, looks fine to me. A few minor comments below.
>
> > diff --git a/src/box/alter.cc b/src/box/alter.cc
> > index a37a68ce4..aa6a79264 100644
> > --- a/src/box/alter.cc
> > +++ b/src/box/alter.cc
> > @@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void
> > *event)>
> > {
> >
> > (void) trigger;
> > (void) event;
> >
> > + /*
> > + * A trigger could be processed by the wal scheduler fiber
> > + * so steal the latch first.
> > + */
>
> Not "could be", but "is processed", I guess.
There are two cases: for non-yielding journal (wal none or recovery) it would
be the same fiber but for yielding one - this would be a tx_prio callback.
>
> > + latch_steal(&schema_lock);
> >
> > latch_unlock(&schema_lock);
> >
> > }
> >
> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index 5e5cd2b08..2994363ab 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc
> > @@ -309,10 +309,13 @@ struct recovery_journal {
> >
> > */
> >
> > static int64_t
> > recovery_journal_write(struct journal *base,
> >
> > - struct journal_entry * /* entry */)
> > + struct journal_entry *entry)
> >
> > {
> >
> > struct recovery_journal *journal = (struct recovery_journal *) base;
> >
> > - return vclock_sum(journal->vclock);
> > + entry->res = vclock_sum(journal->vclock);
> > + if (entry->on_done_cb)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
>
> This can't be - on_done_cb must always be set.
>
> Note, although vy_log uses journal_entry, they don't go through this
> path.
Ok, so I will add an assert instead of the condition.
>
> > + return entry->res;
>
> Shouldn't it return 0? BTW it looks like you need to update the comment
> to journal_write().
Accepted
>
> > }
> >
> > static inline void
> >
> > diff --git a/src/box/journal.c b/src/box/journal.c
> > index fe13fb6ee..eb0db9af2 100644
> > --- a/src/box/journal.c
> > +++ b/src/box/journal.c
> > @@ -41,7 +41,9 @@ static int64_t
> >
> > dummy_journal_write(struct journal *journal, struct journal_entry *entry)
> > {
> >
> > (void) journal;
> >
> > - (void) entry;
> > + entry->res = 0;
> > + if (entry->on_done_cb)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
> >
> > return 0;
> >
> > }
> >
> > @@ -53,7 +55,9 @@ static struct journal dummy_journal = {
> >
> > struct journal *current_journal = &dummy_journal;
> >
> > struct journal_entry *
> >
> > -journal_entry_new(size_t n_rows, struct region *region)
> > +journal_entry_new(size_t n_rows, struct region *region,
> > + void (*on_done_cb)(struct journal_entry *entry, void *data),
> > + void *on_done_cb_data)
> >
> > {
> >
> > struct journal_entry *entry;
> >
> > @@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region)
> >
> > entry->n_rows = n_rows;
> > entry->res = -1;
> > entry->fiber = fiber();
> >
> > + entry->on_done_cb = on_done_cb;
> > + entry->on_done_cb_data = on_done_cb_data;
> >
> > return entry;
> >
> > }
> >
> > diff --git a/src/box/journal.h b/src/box/journal.h
> > index 8ac32ee5e..b704b5c67 100644
> > --- a/src/box/journal.h
> > +++ b/src/box/journal.h
> > @@ -58,6 +58,14 @@ struct journal_entry {
> >
> > * The fiber issuing the request.
> > */
> >
> > struct fiber *fiber;
> >
> > + /**
> > + * A journal entry completion callback.
> > + */
>
> I think we should elaborate the comment. When is this callback called?
> Is it called on failure? Is it called from the same fiber or some other?
Ok
>
> > + void (*on_done_cb)(struct journal_entry *entry, void *data);
>
> Let's please add a typedef for on_done_cb - you use its signature pretty
> often.
Ok
>
> > + /**
> > + * A journal entry completion callback argument.
> > + */
> > + void *on_done_cb_data;
> >
> > /**
> >
> > * Approximate size of this request when encoded.
> > */
> >
> > @@ -80,7 +88,9 @@ struct region;
> >
> > * @return NULL if out of memory, fiber diagnostics area is set
> > */
> >
> > struct journal_entry *
> >
> > -journal_entry_new(size_t n_rows, struct region *region);
> > +journal_entry_new(size_t n_rows, struct region *region,
> > + void (*on_done_cb)(struct journal_entry *entry, void *data),
> > + void *on_done_cb_data);
> >
> > /**
> >
> > * An API for an abstract journal for all transactions of this
> >
> > diff --git a/src/box/txn.c b/src/box/txn.c
> > index 21f7e98b4..52e16f3e6 100644
> > --- a/src/box/txn.c
> > +++ b/src/box/txn.c
> >
> > @@ -337,6 +337,66 @@ fail:
> > return -1;
> >
> > }
> >
> > +/**
> > + * Complete transaction processing.
> > + */
> > +static void
> > +txn_complete(struct txn *txn)
> > +{
> > + if (txn->signature < 0) {
> > + if (txn->engine)
> > + engine_rollback(txn->engine, txn);
> > + /*
> > + * Some of triggers require for in_txn variable is set so
> > + * restore it for time a trigger is in progress.
> > + */
> > + fiber_set_txn(fiber(), txn);
> > + /* Rollback triggers must not throw. */
> > + if (txn->has_triggers &&
> > + trigger_run(&txn->on_rollback, txn) != 0) {
> > + diag_log();
> > + unreachable();
> > + panic("rollback trigger failed");
> > + }
> > + fiber_set_txn(fiber(), NULL);
> > +
> > + return;
> > + }
> > + /*
> > + * Engine can be NULL if transaction contains IPROTO_NOP
> > + * statements only.
> > + */
> > + if (txn->engine != NULL)
> > + engine_commit(txn->engine, txn);
> > + /*
> > + * Some of triggers require for in_txn variable is set so
> > + * restore it for time a trigger is in progress.
> > + */
> > + fiber_set_txn(fiber(), txn);
>
> copy-and-paste... May be, add a helper function for running triggers?
Accepted
>
> > + /*
> > + * The transaction is in the binary log. No action below
> > + * may throw. In case an error has happened, there is
> > + * no other option but terminate.
> > + */
> > + if (txn->has_triggers &&
> > + trigger_run(&txn->on_commit, txn) != 0) {
> > + diag_log();
> > + unreachable();
> > + panic("commit trigger failed");
> > + }
> > +
> > + fiber_set_txn(fiber(), NULL);
> > +}
> > +
> > +static void
> > +txn_entry_done_cb(struct journal_entry *entry, void *data)
> > +{
> > + struct txn *txn = (struct txn *)data;
> > + txn->signature = entry->res;
> > + txn_complete(txn);
> > +}
> > +
> > +
> >
> > static int64_t
> > txn_write_to_wal(struct txn *txn)
> > {
> >
> > @@ -418,31 +472,20 @@ txn_commit(struct txn *txn)
> >
> > }
> > trigger_clear(&txn->fiber_on_stop);
> >
> > + fiber_set_txn(fiber(), NULL);
> >
> > if (txn->n_new_rows + txn->n_applier_rows > 0) {
> >
> > txn->signature = txn_write_to_wal(txn);
> > if (txn->signature < 0)
> >
> > return -1;
> >
> > + } else {
> > + /*
> > + * However there is noting to write to wal a completion
>
> s/noting/nothing
>
> punctuation marks missing
>
> > + * should be fired.
> > + */
> > + txn->signature = 0;
> > + txn_complete(txn);
> >
> > }
> >
> > - /*
> > - * Engine can be NULL if transaction contains IPROTO_NOP
> > - * statements only.
> > - */
> > - if (txn->engine != NULL)
> > - engine_commit(txn->engine, txn);
> > - /*
> > - * The transaction is in the binary log. No action below
> > - * may throw. In case an error has happened, there is
> > - * no other option but terminate.
> > - */
> > - if (txn->has_triggers &&
> > - trigger_run(&txn->on_commit, txn) != 0) {
> > - diag_log();
> > - unreachable();
> > - panic("commit trigger failed");
> > - }
> > -
> >
> > - fiber_set_txn(fiber(), NULL);
> >
> > txn_free(txn);
> > return 0;
> >
> > fail:
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index 71f6dbb5c..62b6391fd 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -263,6 +263,8 @@ tx_schedule_f(va_list ap)
> >
> > struct journal_entry *req =
> >
> > stailq_shift_entry(&writer->schedule_queue,
> >
> > struct journal_entry, fifo);
> >
> > + if (req->on_done_cb != NULL)
> > + req->on_done_cb(req, req->on_done_cb_data);
> >
> > fiber_wakeup(req->fiber);
> >
> > }
> > writer->is_in_rollback = false;
> >
> > @@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)>
> > {
> >
> > struct wal_writer *writer = (struct wal_writer *) journal;
> >
> > - ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
> > + ERROR_INJECT(ERRINJ_WAL_IO, {
> > + entry->res = -1;
> > + if (entry->on_done_cb != NULL)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
> > + return -1;
> > + });
> >
> > if (writer->is_in_rollback) {
> >
> > /*
> >
> > @@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)>
> > say_error("Aborting transaction %llu during "
> >
> > "cascading rollback",
> > vclock_sum(&writer->vclock));
> >
> > + entry->res = -1;
> > + if (entry->on_done_cb != NULL)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
>
> Could you please add 'goto fail' so as not to duplicate code.
>
> I would also add a helper function journal_entry_complete() that would
> call the callback - would look neater that way IMO.
Accepted
>
> > return -1;
> >
> > }
> >
> > @@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)>
> > if (batch == NULL) {
> >
> > diag_set(OutOfMemory, sizeof(struct wal_msg),
> >
> > "region", "struct wal_msg");
> >
> > + entry->res = -1;
> > + if (entry->on_done_cb != NULL)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
> >
> > return -1;
> >
> > }
> > wal_msg_create(batch);
> >
> > @@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal,
> >
> > entry->rows + entry->n_rows);
> >
> > vclock_merge(&writer->vclock, &vclock_diff);
> > vclock_copy(&replicaset.vclock, &writer->vclock);
> >
> > - return vclock_sum(&writer->vclock);
> > + entry->res = vclock_sum(&writer->vclock);
> > + if (entry->on_done_cb)
> > + entry->on_done_cb(entry, entry->on_done_cb_data);
> > + return entry->res;
> >
> > }
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20190620/d52e5fa3/attachment.sig>
More information about the Tarantool-patches
mailing list