[Tarantool-patches] [PATCH 10/10] box/journal: redesign sync and async writes
Konstantin Osipov
kostja.osipov at gmail.com
Sat Mar 7 00:48:01 MSK 2020
You're on track overall, please see comments inline.
* Cyrill Gorcunov <gorcunov at gmail.com> [20/03/05 15:32]:
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -322,14 +322,14 @@ recovery_journal_write(struct journal *base,
> {
> struct recovery_journal *journal = (struct recovery_journal *) base;
> entry->res = vclock_sum(journal->vclock);
> - journal_entry_complete(entry);
> return 0;
Yes!
> }
>
> static inline void
> recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
> {
> - journal_create(&journal->base, recovery_journal_write, NULL);
> + journal_create(&journal->base, journal_no_write_async,
> + recovery_journal_write, NULL);
Nice.
> journal->vclock = v;
> journal_set(&journal->base);
> }
> diff --git a/src/box/journal.c b/src/box/journal.c
> index 266ee5d1f..2f78a4bc4 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -32,6 +32,21 @@
> #include <small/region.h>
> #include <diag.h>
>
> +int
> +journal_no_write_async(struct journal *journal,
> + struct journal_entry *entry,
> + journal_entry_complete_cb on_complete_cb,
> + void *on_complete_cb_data)
> +{
> + (void)journal;
> + (void)entry;
> + (void)on_complete_cb;
> + (void)on_complete_cb_data;
> +
> + say_error("journal: write_async called from invalid context");
Please panic.
> + return -1;
> +}
> +
> /**
> * Used to load from a memtx snapshot. LSN is not used,
> * but txn_commit() must work.
> @@ -41,13 +56,12 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
> {
> (void) journal;
> entry->res = 0;
> - journal_entry_complete(entry);
Nice!
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -34,6 +34,7 @@
> #include <stdbool.h>
> #include "salad/stailq.h"
> #include "fiber.h"
> +#include "txn.h"
Stray include. Please remove.
> /**
> - * Send a single entry to write.
> + * Write a single entry to the journal in synchronous way.
> *
> - * @return 0 if write was scheduled or -1 in case of an error.
> + * @return 0 if write was processed by a backend or -1 in case of an error.
> */
> static inline int
> journal_write(struct journal_entry *entry)
> {
> + assert(in_txn() != NULL);
You should not create a dependency loop just because of an assert.
> static inline bool
> journal_is_initialized(struct journal *journal)
> {
> - return journal->write != NULL;
> + return journal->write != NULL &&
> + journal->write_async != NULL;
This is an unnecessary change. They are always set together.
Please feel free to add a flag is_initialized if you don't like
checking a single member.
> static void
> -txn_entry_complete_cb(struct journal_entry *entry, void *data)
> +txn_async_complete(struct journal_entry *entry, void *data)
> {
Yes!
> struct txn *txn = data;
> txn->signature = entry->res;
> @@ -478,6 +478,10 @@ txn_entry_complete_cb(struct journal_entry *entry, void *data)
> fiber_set_txn(fiber(), NULL);
> }
>
> +/**
> + * Allocate new journal entry with transaction
> + * data to write.
> + */
> static struct journal_entry *
> txn_journal_entry_new(struct txn *txn)
> {
> @@ -518,24 +522,6 @@ txn_journal_entry_new(struct txn *txn)
> return req;
> }
>
> -static int64_t
> -txn_write_to_wal(struct journal_entry *req)
> -{
> - /*
> - * Send the entry to the journal.
> - *
> - * After this point the transaction must not be used
> - * so reset the corresponding key in the fiber storage.
> - */
> - fiber_set_txn(fiber(), NULL);
> - if (journal_write(req) < 0) {
> - diag_set(ClientError, ER_WAL_IO);
> - diag_log();
> - return -1;
> - }
> - return 0;
> -}
Awesome.
> -
> /*
> * Prepare a transaction using engines.
> */
> @@ -596,42 +582,51 @@ txn_commit_nop(struct txn *txn)
> return false;
> }
>
> +/**
> + * Commit a transaction asynchronously, the
> + * completion is processed by a callback.
> + */
> int
> txn_commit_async(struct txn *txn)
> {
> struct journal_entry *req;
>
> - if (txn_prepare(txn) != 0) {
> - txn_rollback(txn);
> - return -1;
> - }
> + if (txn_prepare(txn) != 0)
> + goto out_rollback;
Why not simply rollback: or out:
out_rollback suggests there more out_s.
>
> if (txn_commit_nop(txn))
> return 0;
>
> req = txn_journal_entry_new(txn);
> - if (req == NULL) {
> - txn_rollback(txn);
> - return -1;
> + if (req == NULL)
> + goto out_rollback;
> +
> + if (journal_write_async(req, txn_async_complete, txn) != 0) {
> + diag_set(ClientError, ER_WAL_IO);
> + diag_log();
> + goto out_rollback;
> }
> - req->on_complete_cb = txn_entry_complete_cb;
> - req->on_complete_cb_data = txn;
>
> - return txn_write_to_wal(req);
> + return 0;
> +
> +out_rollback:
> + txn_rollback(txn);
> + return -1;
> }
>
> +/**
> + * Commit a transaction synchronously.
> + */
> int
> txn_commit(struct txn *txn)
> {
> struct journal_entry *req;
> - int res = -1;
> + int res;
>
> txn->fiber = fiber();
I believe you don't have to set it twice, it's already set.
>
> - if (txn_prepare(txn) != 0) {
> - txn_rollback(txn);
> - goto out;
> - }
> + if (txn_prepare(txn) != 0)
> + goto out_rollback;
>
> if (txn_commit_nop(txn)) {
> res = 0;
> @@ -639,33 +634,40 @@ txn_commit(struct txn *txn)
> }
>
> req = txn_journal_entry_new(txn);
> - if (req == NULL) {
> - txn_rollback(txn);
> - goto out;
> - }
> - req->on_complete_cb = txn_entry_complete_cb;
> - req->on_complete_cb_data = txn;
> -
> - if (txn_write_to_wal(req) != 0)
> - return -1;
> + if (req == NULL)
> + goto out_rollback;
>
> /*
> - * In case of non-yielding journal the transaction could already
> - * be done and there is nothing to wait in such cases.
> + * FIXME: Move error setup inside the
> + * journal engine itself. The ClientError
> + * here is too general.
> */
> - if (!txn_has_flag(txn, TXN_IS_DONE)) {
> - bool cancellable = fiber_set_cancellable(false);
> - fiber_yield();
> - fiber_set_cancellable(cancellable);
> +
> + if (journal_write(req) != 0) {
> + diag_set(ClientError, ER_WAL_IO);
> + diag_log();
> + goto out_rollback;
> }
> +
> + txn->signature = req->res;
> res = txn->signature >= 0 ? 0 : -1;
> - if (res != 0)
> + if (res != 0) {
> diag_set(ClientError, ER_WAL_IO);
> + diag_log();
> + }
>
> + txn_complete(txn);
> + fiber_set_txn(fiber(), NULL);
> out:
> +
> /* Synchronous transactions are freed by the calling fiber. */
> txn_free(txn);
> return res;
> +
> +out_rollback:
> + res = -1;
> + txn_rollback(txn);
> + goto out;
> }
>
> void
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 1668c9348..dd9563f31 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -32,6 +32,7 @@
>
> #include "vclock.h"
> #include "fiber.h"
> +#include "txn.h"
Noooo.
Please avoid dependency loops.
> #include "fio.h"
> #include "errinj.h"
> #include "error.h"
> @@ -60,11 +61,19 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
>
> int wal_dir_lock = -1;
>
> +static int
> +wal_write_async(struct journal *, struct journal_entry *,
> + journal_entry_complete_cb, void *);
> +
> static int
> wal_write(struct journal *, struct journal_entry *);
>
> static int
> -wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
> +wal_write_none_async(struct journal *, struct journal_entry *,
> + journal_entry_complete_cb, void *);
> +
> +static int
> +wal_write_none(struct journal *, struct journal_entry *);
>
> /*
> * WAL writer - maintain a Write Ahead Log for every change
> @@ -349,8 +358,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
> {
> writer->wal_mode = wal_mode;
> writer->wal_max_size = wal_max_size;
> - journal_create(&writer->base, wal_mode == WAL_NONE ?
> - wal_write_in_wal_mode_none : wal_write, NULL);
> + journal_create(&writer->base,
> + wal_mode == WAL_NONE ?
> + wal_write_none_async : wal_write_async,
> + wal_mode == WAL_NONE ?
> + wal_write_none : wal_write,
> + NULL);
>
> struct xlog_opts opts = xlog_opts_default;
> opts.sync_is_async = true;
> @@ -1170,9 +1183,21 @@ wal_writer_f(va_list ap)
> * to be written to disk.
> */
> static int
> -wal_write(struct journal *journal, struct journal_entry *entry)
> +wal_write_async(struct journal *journal, struct journal_entry *entry,
> + journal_entry_complete_cb on_complete_cb,
> + void *on_complete_cb_data)
> {
> struct wal_writer *writer = (struct wal_writer *) journal;
> + struct txn *txn = in_txn();
> +
> + /*
> + * After this point the transaction will
> + * live on its own and processed via callbacks,
> + * so reset the fiber storage.
> + */
> + entry->on_complete_cb = on_complete_cb;
> + entry->on_complete_cb_data = on_complete_cb_data;
> + fiber_set_txn(fiber(), NULL);
This should be in txn.cc, why did you move it here?
>
> ERROR_INJECT(ERRINJ_WAL_IO, {
> goto fail;
> @@ -1220,27 +1245,90 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> return 0;
>
> fail:
> + /*
> + * Don't forget to restore transaction
> + * in a fiber storage: the caller should
> + * be able to run a rollback procedure.
> + */
> + fiber_set_txn(fiber(), txn);
This should not be needed now.
> entry->res = -1;
> - journal_entry_complete(entry);
> + txn->signature = -1;
> return -1;
> }
>
> +static void
> +wal_write_cb(struct journal_entry *entry, void *data)
> +{
> + struct txn *txn = data;
> + (void)entry;
> +
> + /*
> + * On synchronous write just wake up
> + * the waiter which will complete the
> + * transaction.
> + */
> + fiber_wakeup(txn->fiber);
Why not pass fiber as data, this would avoid having to include
txn.h.
> +}
> +
> +/*
> + * Queue entry to write and wait until it processed.
> + */
> static int
> -wal_write_in_wal_mode_none(struct journal *journal,
> - struct journal_entry *entry)
> +wal_write(struct journal *journal, struct journal_entry *entry)
> {
> - struct wal_writer *writer = (struct wal_writer *) journal;
> + struct txn *txn = in_txn();
> +
> + /*
> + * Lets reuse async WAL engine to shrink code a bit.
> + */
> + if (wal_write_async(journal, entry, wal_write_cb, txn) != 0)
txn -> fiber_self().
> + return -1;
> +
> + bool cancellable = fiber_set_cancellable(false);
> + fiber_yield();
> + fiber_set_cancellable(cancellable);
> +
> + /*
> + * Unlike async write we preserve the transaction
> + * in a fiber storage where the caller should finish
> + * the transaction.
> + */
> + fiber_set_txn(fiber(), txn);
> + return 0;
> +}
> +
> +static int
> +wal_write_none_async(struct journal *journal,
> + struct journal_entry *entry,
> + journal_entry_complete_cb on_complete_cb,
> + void *on_complete_cb_data)
> +{
> + struct wal_writer *writer = (struct wal_writer *)journal;
> struct vclock vclock_diff;
> + struct txn *txn = in_txn();
> +
> + (void)on_complete_cb;
> + (void)on_complete_cb_data;
> +
> + fiber_set_txn(fiber(), NULL);
> +
> vclock_create(&vclock_diff);
> wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows,
> entry->rows + entry->n_rows);
> vclock_merge(&writer->vclock, &vclock_diff);
> vclock_copy(&replicaset.vclock, &writer->vclock);
> entry->res = vclock_sum(&writer->vclock);
> - journal_entry_complete(entry);
> +
> + txn->signature = entry->res;
Please move back this assignment to txn.cc.
> return 0;
> }
>
> +static int
> +wal_write_none(struct journal *journal, struct journal_entry *entry)
> +{
> + return wal_write_none_async(journal, entry, NULL, NULL);
> +}
> +
>
--
Konstantin Osipov, Moscow, Russia
More information about the Tarantool-patches
mailing list