[tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback
Vladimir Davydov
vdavydov.dev at gmail.com
Thu Jun 20 17:08:48 MSK 2019
On Thu, Jun 20, 2019 at 12:23:13AM +0300, Georgy Kirichenko wrote:
> Finalize a transaction thorough a journal entry callback. So transaction
> processing doesn't rely on fiber schedule. This also enforce transaction
> finalization order as triggers might fail.
>
> Prerequisites: #1254
> ---
> src/box/alter.cc | 5 +++
> src/box/box.cc | 7 ++-
> src/box/journal.c | 10 ++++-
> src/box/journal.h | 12 ++++-
> src/box/txn.c | 112 ++++++++++++++++++++++++++++++----------------
> src/box/vy_log.c | 3 +-
> src/box/wal.c | 20 ++++++++-
> 7 files changed, 122 insertions(+), 47 deletions(-)
In general, looks fine to me. A few minor comments below.
>
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index a37a68ce4..aa6a79264 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void *event)
> {
> (void) trigger;
> (void) event;
> + /*
> + * A trigger could be processed by the wal scheduler fiber
> + * so steal the latch first.
> + */
Not "could be", but "is processed", I guess.
> + latch_steal(&schema_lock);
> latch_unlock(&schema_lock);
> }
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 5e5cd2b08..2994363ab 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -309,10 +309,13 @@ struct recovery_journal {
> */
> static int64_t
> recovery_journal_write(struct journal *base,
> - struct journal_entry * /* entry */)
> + struct journal_entry *entry)
> {
> struct recovery_journal *journal = (struct recovery_journal *) base;
> - return vclock_sum(journal->vclock);
> + entry->res = vclock_sum(journal->vclock);
> + if (entry->on_done_cb)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
This can't be - on_done_cb must always be set.
Note, although vy_log uses journal_entry, they don't go through this
path.
> + return entry->res;
Shouldn't it return 0? BTW it looks like you need to update the comment
to journal_write().
> }
>
> static inline void
> diff --git a/src/box/journal.c b/src/box/journal.c
> index fe13fb6ee..eb0db9af2 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -41,7 +41,9 @@ static int64_t
> dummy_journal_write(struct journal *journal, struct journal_entry *entry)
> {
> (void) journal;
> - (void) entry;
> + entry->res = 0;
> + if (entry->on_done_cb)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
> return 0;
> }
>
> @@ -53,7 +55,9 @@ static struct journal dummy_journal = {
> struct journal *current_journal = &dummy_journal;
>
> struct journal_entry *
> -journal_entry_new(size_t n_rows, struct region *region)
> +journal_entry_new(size_t n_rows, struct region *region,
> + void (*on_done_cb)(struct journal_entry *entry, void *data),
> + void *on_done_cb_data)
> {
> struct journal_entry *entry;
>
> @@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region)
> entry->n_rows = n_rows;
> entry->res = -1;
> entry->fiber = fiber();
> + entry->on_done_cb = on_done_cb;
> + entry->on_done_cb_data = on_done_cb_data;
> return entry;
> }
>
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 8ac32ee5e..b704b5c67 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -58,6 +58,14 @@ struct journal_entry {
> * The fiber issuing the request.
> */
> struct fiber *fiber;
> + /**
> + * A journal entry completion callback.
> + */
I think we should elaborate the comment. When is this callback called?
Is it called on failure? Is it called from the same fiber or some other?
> + void (*on_done_cb)(struct journal_entry *entry, void *data);
Let's please add a typedef for on_done_cb - you use its signature pretty
often.
> + /**
> + * A journal entry completion callback argument.
> + */
> + void *on_done_cb_data;
> /**
> * Approximate size of this request when encoded.
> */
> @@ -80,7 +88,9 @@ struct region;
> * @return NULL if out of memory, fiber diagnostics area is set
> */
> struct journal_entry *
> -journal_entry_new(size_t n_rows, struct region *region);
> +journal_entry_new(size_t n_rows, struct region *region,
> + void (*on_done_cb)(struct journal_entry *entry, void *data),
> + void *on_done_cb_data);
>
> /**
> * An API for an abstract journal for all transactions of this
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 21f7e98b4..52e16f3e6 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -337,6 +337,66 @@ fail:
> return -1;
> }
>
> +/**
> + * Complete transaction processing.
> + */
> +static void
> +txn_complete(struct txn *txn)
> +{
> + if (txn->signature < 0) {
> + if (txn->engine)
> + engine_rollback(txn->engine, txn);
> + /*
> + * Some of triggers require for in_txn variable is set so
> + * restore it for time a trigger is in progress.
> + */
> + fiber_set_txn(fiber(), txn);
> + /* Rollback triggers must not throw. */
> + if (txn->has_triggers &&
> + trigger_run(&txn->on_rollback, txn) != 0) {
> + diag_log();
> + unreachable();
> + panic("rollback trigger failed");
> + }
> + fiber_set_txn(fiber(), NULL);
> +
> + return;
> + }
> + /*
> + * Engine can be NULL if transaction contains IPROTO_NOP
> + * statements only.
> + */
> + if (txn->engine != NULL)
> + engine_commit(txn->engine, txn);
> + /*
> + * Some of triggers require for in_txn variable is set so
> + * restore it for time a trigger is in progress.
> + */
> + fiber_set_txn(fiber(), txn);
copy-and-paste... May be, add a helper function for running triggers?
> + /*
> + * The transaction is in the binary log. No action below
> + * may throw. In case an error has happened, there is
> + * no other option but terminate.
> + */
> + if (txn->has_triggers &&
> + trigger_run(&txn->on_commit, txn) != 0) {
> + diag_log();
> + unreachable();
> + panic("commit trigger failed");
> + }
> +
> + fiber_set_txn(fiber(), NULL);
> +}
> +
> +static void
> +txn_entry_done_cb(struct journal_entry *entry, void *data)
> +{
> + struct txn *txn = (struct txn *)data;
> + txn->signature = entry->res;
> + txn_complete(txn);
> +}
> +
> +
> static int64_t
> txn_write_to_wal(struct txn *txn)
> {
> @@ -418,31 +472,20 @@ txn_commit(struct txn *txn)
> }
> trigger_clear(&txn->fiber_on_stop);
>
> + fiber_set_txn(fiber(), NULL);
> if (txn->n_new_rows + txn->n_applier_rows > 0) {
> txn->signature = txn_write_to_wal(txn);
> if (txn->signature < 0)
> return -1;
> + } else {
> + /*
> + * However there is noting to write to wal a completion
s/noting/nothing
punctuation marks missing
> + * should be fired.
> + */
> + txn->signature = 0;
> + txn_complete(txn);
> }
> - /*
> - * Engine can be NULL if transaction contains IPROTO_NOP
> - * statements only.
> - */
> - if (txn->engine != NULL)
> - engine_commit(txn->engine, txn);
> - /*
> - * The transaction is in the binary log. No action below
> - * may throw. In case an error has happened, there is
> - * no other option but terminate.
> - */
> - if (txn->has_triggers &&
> - trigger_run(&txn->on_commit, txn) != 0) {
> - diag_log();
> - unreachable();
> - panic("commit trigger failed");
> - }
> -
>
> - fiber_set_txn(fiber(), NULL);
> txn_free(txn);
> return 0;
> fail:
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 71f6dbb5c..62b6391fd 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -263,6 +263,8 @@ tx_schedule_f(va_list ap)
> struct journal_entry *req =
> stailq_shift_entry(&writer->schedule_queue,
> struct journal_entry, fifo);
> + if (req->on_done_cb != NULL)
> + req->on_done_cb(req, req->on_done_cb_data);
> fiber_wakeup(req->fiber);
> }
> writer->is_in_rollback = false;
> @@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> {
> struct wal_writer *writer = (struct wal_writer *) journal;
>
> - ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
> + ERROR_INJECT(ERRINJ_WAL_IO, {
> + entry->res = -1;
> + if (entry->on_done_cb != NULL)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
> + return -1;
> + });
>
> if (writer->is_in_rollback) {
> /*
> @@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> say_error("Aborting transaction %llu during "
> "cascading rollback",
> vclock_sum(&writer->vclock));
> + entry->res = -1;
> + if (entry->on_done_cb != NULL)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
Could you please add 'goto fail' so as not to duplicate code.
I would also add a helper function journal_entry_complete() that would
call the callback - would look neater that way IMO.
> return -1;
> }
>
> @@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> if (batch == NULL) {
> diag_set(OutOfMemory, sizeof(struct wal_msg),
> "region", "struct wal_msg");
> + entry->res = -1;
> + if (entry->on_done_cb != NULL)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
> return -1;
> }
> wal_msg_create(batch);
> @@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal,
> entry->rows + entry->n_rows);
> vclock_merge(&writer->vclock, &vclock_diff);
> vclock_copy(&replicaset.vclock, &writer->vclock);
> - return vclock_sum(&writer->vclock);
> + entry->res = vclock_sum(&writer->vclock);
> + if (entry->on_done_cb)
> + entry->on_done_cb(entry, entry->on_done_cb_data);
> + return entry->res;
> }
More information about the Tarantool-patches
mailing list