[Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations

Cyrill Gorcunov gorcunov at gmail.com
Thu Mar 19 12:05:37 MSK 2020


Currently the journal provides only one method -- write,
which implies a callback to trigger upon write completion
(in contrary with 1.10 series where all commits were
processing in synchronous way).

Lets make difference between sync and async writes more
notable: provide journal::write_async method which takes
a callback data as an argument, while journal:write handle
transaction in synchronous way.

Redesing notes:

1) The callback for async write set once in journal
   creation. There is no need to carry callback in
   every journal entry. This allows us to save some
   memory

2) Both journal_write and journal_write_async require
   the caller to pass valid fiber->storage.txn, on
   error the txn must not be reset but preserved so
   callers would be able to run txn_rollback

3) txn_commit and txn_commit_async call txn_rollback
   where appropriate

4) no need to call journal_entry_complete on sync
   writes anymore

5) wal_write_in_wal_mode_none is too long, renamed
   to wal_write_none

Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
 src/box/box.cc    |  6 +--
 src/box/journal.c | 39 +++++++++++++++-----
 src/box/journal.h | 94 +++++++++++++++++++++++++++++++++--------------
 src/box/txn.c     | 53 ++++++++++++--------------
 src/box/vy_log.c  |  4 +-
 src/box/wal.c     | 74 ++++++++++++++++++++++++++++++++-----
 6 files changed, 191 insertions(+), 79 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index cf79affca..87ddbeb3a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -322,7 +322,6 @@ recovery_journal_write(struct journal *base,
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
 	entry->res = vclock_sum(journal->vclock);
-	journal_entry_complete(entry);
 	return 0;
 }
 
@@ -330,8 +329,9 @@ static void
 recovery_journal_create(struct vclock *v)
 {
 	static struct recovery_journal journal;
-
-	journal_create(&journal.base, recovery_journal_write, NULL);
+	journal_create(&journal.base, journal_no_write_async,
+		       journal_no_write_async_cb,
+		       recovery_journal_write, NULL);
 	journal.vclock = v;
 	journal_set(&journal.base);
 }
diff --git a/src/box/journal.c b/src/box/journal.c
index 11e78990d..036aad87a 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -32,6 +32,29 @@
 #include <small/region.h>
 #include <diag.h>
 
+int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry,
+		       void *complete_data)
+{
+	(void)complete_data;
+	(void)journal;
+
+	say_error("journal: write_async invalid context");
+	entry->res = -1;
+	return -1;
+}
+
+void
+journal_no_write_async_cb(struct journal_entry *entry,
+			  void *complete_data)
+{
+	(void)complete_data;
+
+	say_error("journal: write_async_cb invalid context");
+	entry->res = -1;
+}
+
 /**
  * Used to load from a memtx snapshot. LSN is not used,
  * but txn_commit() must work.
@@ -41,21 +64,19 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
 	entry->res = 0;
-	journal_entry_complete(entry);
 	return 0;
 }
 
 static struct journal dummy_journal = {
-	dummy_journal_write,
-	NULL,
+	.write_async	= journal_no_write_async,
+	.write_async_cb	= journal_no_write_async_cb,
+	.write		= dummy_journal_write,
 };
 
 struct journal *current_journal = &dummy_journal;
 
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region,
-		  journal_entry_complete_cb on_complete_cb,
-		  void *on_complete_cb_data)
+journal_entry_new(size_t n_rows, struct region *region)
 {
 	struct journal_entry *entry;
 
@@ -68,11 +89,11 @@ journal_entry_new(size_t n_rows, struct region *region,
 		diag_set(OutOfMemory, size, "region", "struct journal_entry");
 		return NULL;
 	}
+
+	entry->complete_data = NULL;
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
-	entry->on_complete_cb = on_complete_cb;
-	entry->on_complete_cb_data = on_complete_cb_data;
+
 	return entry;
 }
-
diff --git a/src/box/journal.h b/src/box/journal.h
index 64f167c6f..e1947edd1 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -34,6 +34,7 @@
 #include <stdbool.h>
 #include "salad/stailq.h"
 #include "fiber.h"
+#include "txn.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -60,17 +61,10 @@ struct journal_entry {
 	 * the committed transaction, on error is -1
 	 */
 	int64_t res;
-	/**
-	 * A journal entry finalization callback which is going to be called
-	 * after the entry processing was finished in both cases: success
-	 * or fail. Entry->res is set to a result value before the callback
-	 * is fired.
-	 */
-	journal_entry_complete_cb on_complete_cb;
 	/**
 	 * A journal entry completion callback argument.
 	 */
-	void *on_complete_cb_data;
+	void *complete_data;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -93,18 +87,7 @@ struct region;
  * @return NULL if out of memory, fiber diagnostics area is set
  */
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region,
-		  journal_entry_complete_cb on_complete_cb,
-		  void *on_complete_cb_data);
-
-/**
- * Finalize a single entry.
- */
-static inline void
-journal_entry_complete(struct journal_entry *entry)
-{
-	entry->on_complete_cb(entry, entry->on_complete_cb_data);
-}
+journal_entry_new(size_t n_rows, struct region *region);
 
 /**
  * An API for an abstract journal for all transactions of this
@@ -112,11 +95,33 @@ journal_entry_complete(struct journal_entry *entry)
  * synchronous replication.
  */
 struct journal {
+	/** Asynchronous write */
+	int (*write_async)(struct journal *journal,
+			   struct journal_entry *entry,
+			   void *complete_data);
+
+	/** Asynchronous write completion */
+	void (*write_async_cb)(struct journal_entry *entry,
+			       void *complete_data);
+
+	/** Synchronous write */
 	int (*write)(struct journal *journal,
-		     struct journal_entry *req);
+		     struct journal_entry *entry);
+
+	/** Journal destroy */
 	void (*destroy)(struct journal *journal);
 };
 
+/**
+ * Finalize a single entry.
+ */
+static inline void
+journal_async_complete(struct journal *journal, struct journal_entry *entry)
+{
+	assert(journal->write_async_cb != NULL);
+	journal->write_async_cb(entry, entry->complete_data);
+}
+
 /**
  * Depending on the step of recovery and instance configuration
  * points at a concrete implementation of the journal.
@@ -124,16 +129,30 @@ struct journal {
 extern struct journal *current_journal;
 
 /**
- * Send a single entry to write.
+ * Write a single entry to the journal in synchronous way.
  *
- * @return 0 if write was scheduled or -1 in case of an error.
+ * @return 0 if write was processed by a backend or -1 in case of an error.
  */
 static inline int
 journal_write(struct journal_entry *entry)
 {
+	assert(in_txn() != NULL);
 	return current_journal->write(current_journal, entry);
 }
 
+/**
+ * Queue a single entry to the journal in asynchronous way.
+ *
+ * @return 0 if write was queued to a backend or -1 in case of an error.
+ */
+static inline int
+journal_write_async(struct journal_entry *entry, void *complete_data)
+{
+	assert(in_txn() != NULL);
+	return current_journal->write_async(current_journal, entry,
+					    complete_data);
+}
+
 /**
  * Change the current implementation of the journaling API.
  * Happens during life cycle of an instance:
@@ -165,13 +184,34 @@ journal_set(struct journal *new_journal)
 
 static inline void
 journal_create(struct journal *journal,
-	       int (*write)(struct journal *, struct journal_entry *),
-	       void (*destroy)(struct journal *))
+	       int (*write_async)(struct journal *journal,
+				  struct journal_entry *entry,
+				  void *complete_data),
+	       void (*write_async_cb)(struct journal_entry *entry,
+				      void *complete_data),
+	       int (*write)(struct journal *journal,
+			    struct journal_entry *entry),
+	       void (*destroy)(struct journal *journal))
 {
-	journal->write = write;
-	journal->destroy = destroy;
+	journal->write_async	= write_async;
+	journal->write_async_cb	= write_async_cb;
+	journal->write		= write;
+	journal->destroy	= destroy;
 }
 
+/**
+ * A stub to issue an error in case if asynchronous
+ * write is diabled in the backend.
+ */
+extern int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry,
+		       void *complete_data);
+
+extern void
+journal_no_write_async_cb(struct journal_entry *entry,
+			  void *complete_data);
+
 static inline bool
 journal_is_initialized(struct journal *journal)
 {
diff --git a/src/box/txn.c b/src/box/txn.c
index 11c20aceb..33302586d 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -487,7 +487,7 @@ txn_journal_entry_new(struct txn *txn)
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
 	req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows,
-				&txn->region, txn_complete_async, txn);
+				&txn->region);
 	if (req == NULL)
 		return NULL;
 
@@ -518,24 +518,6 @@ txn_journal_entry_new(struct txn *txn)
 	return req;
 }
 
-static int64_t
-txn_write_to_wal(struct journal_entry *req)
-{
-	/*
-	 * Send the entry to the journal.
-	 *
-	 * After this point the transaction must not be used
-	 * so reset the corresponding key in the fiber storage.
-	 */
-	fiber_set_txn(fiber(), NULL);
-	if (journal_write(req) < 0) {
-		diag_set(ClientError, ER_WAL_IO);
-		diag_log();
-		return -1;
-	}
-	return 0;
-}
-
 /*
  * Prepare a transaction using engines.
  */
@@ -608,7 +590,15 @@ txn_commit_async(struct txn *txn)
 		return -1;
 	}
 
-	return txn_write_to_wal(req);
+	if (journal_write_async(req, txn) != 0) {
+		txn_rollback(txn);
+
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		return -1;
+	}
+
+	return 0;
 }
 
 int
@@ -636,21 +626,26 @@ txn_commit(struct txn *txn)
 		return -1;
 	}
 
-	if (txn_write_to_wal(req) != 0)
+	if (journal_write(req) != 0) {
+		txn_rollback(txn);
+		txn_free(txn);
+
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
 		return -1;
+	}
 
-	/*
-	 * In case of non-yielding journal the transaction could already
-	 * be done and there is nothing to wait in such cases.
-	 */
 	if (!txn_has_flag(txn, TXN_IS_DONE)) {
-		bool cancellable = fiber_set_cancellable(false);
-		fiber_yield();
-		fiber_set_cancellable(cancellable);
+		txn->signature = req->res;
+		txn_complete(txn);
+		fiber_set_txn(fiber(), NULL);
 	}
+
 	int res = txn->signature >= 0 ? 0 : -1;
-	if (res != 0)
+	if (res != 0) {
 		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+	}
 
 	/* Synchronous transactions are freed by the calling fiber. */
 	txn_free(txn);
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index cb291f3c8..92171ec21 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -815,8 +815,8 @@ vy_log_tx_flush(struct vy_log_tx *tx)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc,
-							NULL, NULL);
+
+	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 1668c9348..ba9f22e7a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -32,6 +32,7 @@
 
 #include "vclock.h"
 #include "fiber.h"
+#include "txn.h"
 #include "fio.h"
 #include "errinj.h"
 #include "error.h"
@@ -60,11 +61,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
 int wal_dir_lock = -1;
 
+static int
+wal_write_async(struct journal *, struct journal_entry *, void *);
+
 static int
 wal_write(struct journal *, struct journal_entry *);
 
 static int
-wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
+wal_write_none_async(struct journal *, struct journal_entry *, void *);
+
+static int
+wal_write_none(struct journal *, struct journal_entry *);
 
 /*
  * WAL writer - maintain a Write Ahead Log for every change
@@ -253,9 +260,10 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 static void
 tx_schedule_queue(struct stailq *queue)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	struct journal_entry *req, *tmp;
 	stailq_foreach_entry_safe(req, tmp, queue, fifo)
-		journal_entry_complete(req);
+		journal_async_complete(&writer->base, req);
 }
 
 /**
@@ -349,8 +357,14 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_size = wal_max_size;
-	journal_create(&writer->base, wal_mode == WAL_NONE ?
-		       wal_write_in_wal_mode_none : wal_write, NULL);
+
+	journal_create(&writer->base,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none_async : wal_write_async,
+		       txn_complete_async,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none : wal_write,
+		       NULL);
 
 	struct xlog_opts opts = xlog_opts_default;
 	opts.sync_is_async = true;
@@ -1170,9 +1184,18 @@ wal_writer_f(va_list ap)
  * to be written to disk.
  */
 static int
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_write_async(struct journal *journal, struct journal_entry *entry,
+		void *complete_data)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
+	entry->complete_data = complete_data;
+	struct txn *txn = complete_data;
+
+	/*
+	 * After this point the transaction should not
+	 * be bound to the fiber, it handled by a callback.
+	 */
+	fiber_set_txn(fiber(), NULL);
 
 	ERROR_INJECT(ERRINJ_WAL_IO, {
 		goto fail;
@@ -1220,14 +1243,39 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	return 0;
 
 fail:
+	/*
+	 * Don't forget to restore transaction
+	 * in a fiber storage: the caller should
+	 * be able to run a rollback procedure.
+	 */
+	fiber_set_txn(fiber(), txn);
 	entry->res = -1;
-	journal_entry_complete(entry);
 	return -1;
 }
 
 static int
-wal_write_in_wal_mode_none(struct journal *journal,
-			   struct journal_entry *entry)
+wal_write(struct journal *journal, struct journal_entry *entry)
+{
+	struct txn *txn = in_txn();
+
+	/*
+	 * We can reuse async WAL engine transparently
+	 * to the caller.
+	 */
+	if (wal_write_async(journal, entry, txn) != 0)
+		return -1;
+
+	bool cancellable = fiber_set_cancellable(false);
+	fiber_yield();
+	fiber_set_cancellable(cancellable);
+
+	return 0;
+}
+
+static int
+wal_write_none_async(struct journal *journal,
+		     struct journal_entry *entry,
+		     void *complete_data)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	struct vclock vclock_diff;
@@ -1237,10 +1285,18 @@ wal_write_in_wal_mode_none(struct journal *journal,
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
-	journal_entry_complete(entry);
+
+	(void)complete_data;
+	fiber_set_txn(fiber(), NULL);
 	return 0;
 }
 
+static int
+wal_write_none(struct journal *journal, struct journal_entry *entry)
+{
+	return wal_write_none_async(journal, entry, NULL);
+}
+
 void
 wal_init_vy_log()
 {
-- 
2.20.1



More information about the Tarantool-patches mailing list