[Tarantool-patches] [PATCH 10/10] box/journal: redesign sync and async writes

Konstantin Osipov kostja.osipov at gmail.com
Sat Mar 7 00:48:01 MSK 2020


You're on track overall, please see comments inline.

* Cyrill Gorcunov <gorcunov at gmail.com> [20/03/05 15:32]:
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -322,14 +322,14 @@ recovery_journal_write(struct journal *base,
>  {
>  	struct recovery_journal *journal = (struct recovery_journal *) base;
>  	entry->res = vclock_sum(journal->vclock);
> -	journal_entry_complete(entry);
>  	return 0;

Yes!

>  }
>  
>  static inline void
>  recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
>  {
> -	journal_create(&journal->base, recovery_journal_write, NULL);
> +	journal_create(&journal->base, journal_no_write_async,
> +		       recovery_journal_write, NULL);

Nice.

>  	journal->vclock = v;
>  	journal_set(&journal->base);
>  }
> diff --git a/src/box/journal.c b/src/box/journal.c
> index 266ee5d1f..2f78a4bc4 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -32,6 +32,21 @@
>  #include <small/region.h>
>  #include <diag.h>
>  
> +int
> +journal_no_write_async(struct journal *journal,
> +		       struct journal_entry *entry,
> +		       journal_entry_complete_cb on_complete_cb,
> +		       void *on_complete_cb_data)
> +{
> +	(void)journal;
> +	(void)entry;
> +	(void)on_complete_cb;
> +	(void)on_complete_cb_data;
> +
> +	say_error("journal: write_async called from invalid context");

Please panic.

> +	return -1;
> +}
> +
>  /**
>   * Used to load from a memtx snapshot. LSN is not used,
>   * but txn_commit() must work.
> @@ -41,13 +56,12 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
>  {
>  	(void) journal;
>  	entry->res = 0;
> -	journal_entry_complete(entry);

Nice!

> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -34,6 +34,7 @@
>  #include <stdbool.h>
>  #include "salad/stailq.h"
>  #include "fiber.h"
> +#include "txn.h"

Stray include. Please remove.

>  /**
> - * Send a single entry to write.
> + * Write a single entry to the journal in synchronous way.
>   *
> - * @return 0 if write was scheduled or -1 in case of an error.
> + * @return 0 if write was processed by a backend or -1 in case of an error.
>   */
>  static inline int
>  journal_write(struct journal_entry *entry)
>  {
> +	assert(in_txn() != NULL);

You should not create a dependency loop just because of an assert.

>  static inline bool
>  journal_is_initialized(struct journal *journal)
>  {
> -	return journal->write != NULL;
> +	return journal->write != NULL &&
> +		journal->write_async != NULL;

This is an unnecessary change. They are always set together.
Please feel free to add a flag is_initialized if you don't like
checking a single member.

>  static void
> -txn_entry_complete_cb(struct journal_entry *entry, void *data)
> +txn_async_complete(struct journal_entry *entry, void *data)
>  {

Yes!

>  	struct txn *txn = data;
>  	txn->signature = entry->res;
> @@ -478,6 +478,10 @@ txn_entry_complete_cb(struct journal_entry *entry, void *data)
>  	fiber_set_txn(fiber(), NULL);
>  }
>  
> +/**
> + * Allocate new journal entry with transaction
> + * data to write.
> + */
>  static struct journal_entry *
>  txn_journal_entry_new(struct txn *txn)
>  {
> @@ -518,24 +522,6 @@ txn_journal_entry_new(struct txn *txn)
>  	return req;
>  }
>  
> -static int64_t
> -txn_write_to_wal(struct journal_entry *req)
> -{
> -	/*
> -	 * Send the entry to the journal.
> -	*
> -	 * After this point the transaction must not be used
> -	 * so reset the corresponding key in the fiber storage.
> -	 */
> -	fiber_set_txn(fiber(), NULL);
> -	if (journal_write(req) < 0) {
> -		diag_set(ClientError, ER_WAL_IO);
> -		diag_log();
> -		return -1;
> -	}
> -	return 0;
> -}

Awesome.

> -
>  /*
>   * Prepare a transaction using engines.
>   */
> @@ -596,42 +582,51 @@ txn_commit_nop(struct txn *txn)
>  	return false;
>  }
>  
> +/**
> + * Commit a transaction asynchronously, the
> + * completion is processed by a callback.
> + */
>  int
>  txn_commit_async(struct txn *txn)
>  {
>  	struct journal_entry *req;
>  
> -	if (txn_prepare(txn) != 0) {
> -		txn_rollback(txn);
> -		return -1;
> -	}
> +	if (txn_prepare(txn) != 0)
> +		goto out_rollback;

Why not simply rollback: or out:
out_rollback suggests there more out_s.

>  
>  	if (txn_commit_nop(txn))
>  		return 0;
>  
>  	req = txn_journal_entry_new(txn);
> -	if (req == NULL) {
> -		txn_rollback(txn);
> -		return -1;
> +	if (req == NULL)
> +		goto out_rollback;
> +
> +	if (journal_write_async(req, txn_async_complete, txn) != 0) {
> +		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();
> +		goto out_rollback;
>  	}
> -	req->on_complete_cb = txn_entry_complete_cb;
> -	req->on_complete_cb_data = txn;
>  
> -	return txn_write_to_wal(req);
> +	return 0;
> +
> +out_rollback:
> +	txn_rollback(txn);
> +	return -1;
>  }
>  
> +/**
> + * Commit a transaction synchronously.
> + */
>  int
>  txn_commit(struct txn *txn)
>  {
>  	struct journal_entry *req;
> -	int res = -1;
> +	int res;
>  
>  	txn->fiber = fiber();

I believe you don't have to set it twice, it's already set.

>  
> -	if (txn_prepare(txn) != 0) {
> -		txn_rollback(txn);
> -		goto out;
> -	}
> +	if (txn_prepare(txn) != 0)
> +		goto out_rollback;
>  
>  	if (txn_commit_nop(txn)) {
>  		res = 0;
> @@ -639,33 +634,40 @@ txn_commit(struct txn *txn)
>  	}
>  
>  	req = txn_journal_entry_new(txn);
> -	if (req == NULL) {
> -		txn_rollback(txn);
> -		goto out;
> -	}
> -	req->on_complete_cb = txn_entry_complete_cb;
> -	req->on_complete_cb_data = txn;
> -
> -	if (txn_write_to_wal(req) != 0)
> -		return -1;
> +	if (req == NULL)
> +		goto out_rollback;
>  
>  	/*
> -	 * In case of non-yielding journal the transaction could already
> -	 * be done and there is nothing to wait in such cases.
> +	 * FIXME: Move error setup inside the
> +	 * journal engine itself. The ClientError
> +	 * here is too general.
>  	 */
> -	if (!txn_has_flag(txn, TXN_IS_DONE)) {
> -		bool cancellable = fiber_set_cancellable(false);
> -		fiber_yield();
> -		fiber_set_cancellable(cancellable);
> +
> +	if (journal_write(req) != 0) {
> +		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();
> +		goto out_rollback;
>  	}
> +
> +	txn->signature = req->res;
>  	res = txn->signature >= 0 ? 0 : -1;
> -	if (res != 0)
> +	if (res != 0) {
>  		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();
> +	}
>  
> +	txn_complete(txn);
> +	fiber_set_txn(fiber(), NULL);
>  out:
> +
>  	/* Synchronous transactions are freed by the calling fiber. */
>  	txn_free(txn);
>  	return res;
> +
> +out_rollback:
> +	res = -1;
> +	txn_rollback(txn);
> +	goto out;
>  }


>  
>  void
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 1668c9348..dd9563f31 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -32,6 +32,7 @@
>  
>  #include "vclock.h"
>  #include "fiber.h"
> +#include "txn.h"

Noooo.

Please avoid dependency loops.

>  #include "fio.h"
>  #include "errinj.h"
>  #include "error.h"
> @@ -60,11 +61,19 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
>  
>  int wal_dir_lock = -1;
>  
> +static int
> +wal_write_async(struct journal *, struct journal_entry *,
> +		journal_entry_complete_cb, void *);
> +
>  static int
>  wal_write(struct journal *, struct journal_entry *);
>  
>  static int
> -wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
> +wal_write_none_async(struct journal *, struct journal_entry *,
> +		     journal_entry_complete_cb, void *);
> +
> +static int
> +wal_write_none(struct journal *, struct journal_entry *);
>  
>  /*
>   * WAL writer - maintain a Write Ahead Log for every change
> @@ -349,8 +358,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
>  {
>  	writer->wal_mode = wal_mode;
>  	writer->wal_max_size = wal_max_size;
> -	journal_create(&writer->base, wal_mode == WAL_NONE ?
> -		       wal_write_in_wal_mode_none : wal_write, NULL);
> +	journal_create(&writer->base,
> +		       wal_mode == WAL_NONE ?
> +		       wal_write_none_async : wal_write_async,
> +		       wal_mode == WAL_NONE ?
> +		       wal_write_none : wal_write,
> +		       NULL);
>  
>  	struct xlog_opts opts = xlog_opts_default;
>  	opts.sync_is_async = true;
> @@ -1170,9 +1183,21 @@ wal_writer_f(va_list ap)
>   * to be written to disk.
>   */
>  static int
> -wal_write(struct journal *journal, struct journal_entry *entry)
> +wal_write_async(struct journal *journal, struct journal_entry *entry,
> +		journal_entry_complete_cb on_complete_cb,
> +		void *on_complete_cb_data)
>  {
>  	struct wal_writer *writer = (struct wal_writer *) journal;
> +	struct txn *txn = in_txn();
> +
> +	/*
> +	 * After this point the transaction will
> +	 * live on its own and processed via callbacks,
> +	 * so reset the fiber storage.
> +	 */
> +	entry->on_complete_cb = on_complete_cb;
> +	entry->on_complete_cb_data = on_complete_cb_data;
> +	fiber_set_txn(fiber(), NULL);

This should be in txn.cc, why did you move it here?

>  
>  	ERROR_INJECT(ERRINJ_WAL_IO, {
>  		goto fail;
> @@ -1220,27 +1245,90 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>  	return 0;
>  
>  fail:
> +	/*
> +	 * Don't forget to restore transaction
> +	 * in a fiber storage: the caller should
> +	 * be able to run a rollback procedure.
> +	 */
> +	fiber_set_txn(fiber(), txn);

This should not be needed now.

>  	entry->res = -1;
> -	journal_entry_complete(entry);
> +	txn->signature = -1;
>  	return -1;
>  }
>  
> +static void
> +wal_write_cb(struct journal_entry *entry, void *data)
> +{
> +	struct txn *txn = data;
> +	(void)entry;
> +
> +	/*
> +	 * On synchronous write just wake up
> +	 * the waiter which will complete the
> +	 * transaction.
> +	 */
> +	fiber_wakeup(txn->fiber);

Why not pass fiber as data, this would avoid having to include
txn.h.

> +}
> +
> +/*
> + * Queue entry to write and wait until it processed.
> + */
>  static int
> -wal_write_in_wal_mode_none(struct journal *journal,
> -			   struct journal_entry *entry)
> +wal_write(struct journal *journal, struct journal_entry *entry)
>  {
> -	struct wal_writer *writer = (struct wal_writer *) journal;
> +	struct txn *txn = in_txn();
> +
> +	/*
> +	 * Lets reuse async WAL engine to shrink code a bit.
> +	 */
> +	if (wal_write_async(journal, entry, wal_write_cb, txn) != 0)

txn -> fiber_self().
> +		return -1;
> +
> +	bool cancellable = fiber_set_cancellable(false);
> +	fiber_yield();
> +	fiber_set_cancellable(cancellable);
> +
> +	/*
> +	 * Unlike async write we preserve the transaction
> +	 * in a fiber storage where the caller should finish
> +	 * the transaction.
> +	 */
> +	fiber_set_txn(fiber(), txn);
> +	return 0;
> +}
> +
> +static int
> +wal_write_none_async(struct journal *journal,
> +		     struct journal_entry *entry,
> +		     journal_entry_complete_cb on_complete_cb,
> +		     void *on_complete_cb_data)
> +{
> +	struct wal_writer *writer = (struct wal_writer *)journal;
>  	struct vclock vclock_diff;
> +	struct txn *txn = in_txn();
> +
> +	(void)on_complete_cb;
> +	(void)on_complete_cb_data;
> +
> +	fiber_set_txn(fiber(), NULL);
> +
>  	vclock_create(&vclock_diff);
>  	wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows,
>  		       entry->rows + entry->n_rows);
>  	vclock_merge(&writer->vclock, &vclock_diff);
>  	vclock_copy(&replicaset.vclock, &writer->vclock);
>  	entry->res = vclock_sum(&writer->vclock);
> -	journal_entry_complete(entry);
> +
> +	txn->signature = entry->res;


Please move back this assignment to txn.cc.

>  	return 0;
>  }
>  
> +static int
> +wal_write_none(struct journal *journal, struct journal_entry *entry)
> +{
> +	return wal_write_none_async(journal, entry, NULL, NULL);
> +}
> +

> 

-- 
Konstantin Osipov, Moscow, Russia


More information about the Tarantool-patches mailing list