[Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations

Cyrill Gorcunov gorcunov at gmail.com
Thu Mar 19 19:20:16 MSK 2020


On Thu, Mar 19, 2020 at 02:12:06PM +0300, Konstantin Osipov wrote:
> * Cyrill Gorcunov <gorcunov at gmail.com> [20/03/19 13:51]:
> 
> I really it's not worth it, adding a cicrular dependency.
> 
> you can declare struct txn in wal.cc, and pass it around by
> pointer without using the header.
> 
> The whole goal of this refactoring was to move fiber_set_txn() 
> outside wal.cc, to txn.cc.
> 
> Is it still impossible? let's discuss.
> 
> Module boundaries are really important long term. You add it for a
> valid reason, and some other patch will begin using txn inside wal
> for a silly reason.

Kostya, take a look please, does it look better? I had to drop passing
completion data outside but assign it to the entry, this is close what
Gosha has except more explicit I think. Same time in wal_init I need
to pass write_async_cb function so the journal would save it internally
and won't waste memory on every journal entry.

The whole series sits in gorcunov/gh-4031-txn_write_to_wal-14

---
>From 39334ef60ad8101546bc7a177b07eed10b0c3d65 Mon Sep 17 00:00:00 2001
From: Cyrill Gorcunov <gorcunov at gmail.com>
Date: Wed, 18 Mar 2020 22:36:37 +0300
Subject: [PATCH] box/journal: redesign journal operations

Currently the journal provides only one method -- write,
which implies a callback to trigger upon write completion
(in contrary with 1.10 series where all commits were
processing in synchronous way).

Lets make difference between sync and async writes more
notable: provide journal::write_async method which runs
completion function once entry is written, in turn
journal:write handle transaction in synchronous way.

Redesing notes:

1) The callback for async write set once in journal
   creation. There is no need to carry callback in
   every journal entry. This allows us to save some
   memory;

2) txn_commit and txn_commit_async call txn_rollback
   where appropriate;

3) no need to call journal_entry_complete on sync
   writes anymore;

4) wal_write_in_wal_mode_none is too long, renamed
   to wal_write_none;

5) wal engine use async writes internally but it is
   transparent to callers.

Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
 src/box/box.cc    |  8 ++---
 src/box/journal.c | 33 +++++++++++++-----
 src/box/journal.h | 87 +++++++++++++++++++++++++++++++----------------
 src/box/txn.c     | 61 ++++++++++++++++-----------------
 src/box/txn.h     |  2 +-
 src/box/vy_log.c  |  5 +--
 src/box/wal.c     | 61 ++++++++++++++++++++++++++-------
 src/box/wal.h     |  4 +--
 8 files changed, 172 insertions(+), 89 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index cf79affca..3a3bda78e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -322,7 +322,6 @@ recovery_journal_write(struct journal *base,
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
 	entry->res = vclock_sum(journal->vclock);
-	journal_entry_complete(entry);
 	return 0;
 }
 
@@ -330,8 +329,9 @@ static void
 recovery_journal_create(struct vclock *v)
 {
 	static struct recovery_journal journal;
-
-	journal_create(&journal.base, recovery_journal_write, NULL);
+	journal_create(&journal.base, journal_no_write_async,
+		       journal_no_write_async_cb,
+		       recovery_journal_write, NULL);
 	journal.vclock = v;
 	journal_set(&journal.base);
 }
@@ -2353,7 +2353,7 @@ box_cfg_xc(void)
 
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
-	if (wal_init(wal_mode, cfg_gets("wal_dir"),
+	if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
 		     wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
 		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
diff --git a/src/box/journal.c b/src/box/journal.c
index 11e78990d..6c4a0850a 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -32,6 +32,24 @@
 #include <small/region.h>
 #include <diag.h>
 
+int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry)
+{
+	(void)journal;
+
+	say_error("journal: write_async invalid context");
+	entry->res = -1;
+	return -1;
+}
+
+void
+journal_no_write_async_cb(struct journal_entry *entry)
+{
+	say_error("journal: write_async_cb invalid context");
+	entry->res = -1;
+}
+
 /**
  * Used to load from a memtx snapshot. LSN is not used,
  * but txn_commit() must work.
@@ -41,21 +59,20 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
 	entry->res = 0;
-	journal_entry_complete(entry);
 	return 0;
 }
 
 static struct journal dummy_journal = {
-	dummy_journal_write,
-	NULL,
+	.write_async	= journal_no_write_async,
+	.write_async_cb	= journal_no_write_async_cb,
+	.write		= dummy_journal_write,
 };
 
 struct journal *current_journal = &dummy_journal;
 
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
-		  journal_entry_complete_cb on_complete_cb,
-		  void *on_complete_cb_data)
+		  void *complete_data)
 {
 	struct journal_entry *entry;
 
@@ -68,11 +85,11 @@ journal_entry_new(size_t n_rows, struct region *region,
 		diag_set(OutOfMemory, size, "region", "struct journal_entry");
 		return NULL;
 	}
+
+	entry->complete_data = complete_data;
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
-	entry->on_complete_cb = on_complete_cb;
-	entry->on_complete_cb_data = on_complete_cb_data;
+
 	return entry;
 }
-
diff --git a/src/box/journal.h b/src/box/journal.h
index 64f167c6f..045302906 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -34,6 +34,7 @@
 #include <stdbool.h>
 #include "salad/stailq.h"
 #include "fiber.h"
+#include "txn.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -42,9 +43,6 @@ extern "C" {
 struct xrow_header;
 struct journal_entry;
 
-/** Journal entry finalization callback typedef. */
-typedef void (*journal_entry_complete_cb)(struct journal_entry *entry, void *data);
-
 /**
  * An entry for an abstract journal.
  * Simply put, a write ahead log request.
@@ -60,17 +58,10 @@ struct journal_entry {
 	 * the committed transaction, on error is -1
 	 */
 	int64_t res;
-	/**
-	 * A journal entry finalization callback which is going to be called
-	 * after the entry processing was finished in both cases: success
-	 * or fail. Entry->res is set to a result value before the callback
-	 * is fired.
-	 */
-	journal_entry_complete_cb on_complete_cb;
 	/**
 	 * A journal entry completion callback argument.
 	 */
-	void *on_complete_cb_data;
+	void *complete_data;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -94,17 +85,7 @@ struct region;
  */
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
-		  journal_entry_complete_cb on_complete_cb,
-		  void *on_complete_cb_data);
-
-/**
- * Finalize a single entry.
- */
-static inline void
-journal_entry_complete(struct journal_entry *entry)
-{
-	entry->on_complete_cb(entry, entry->on_complete_cb_data);
-}
+		  void *complete_data);
 
 /**
  * An API for an abstract journal for all transactions of this
@@ -112,11 +93,31 @@ journal_entry_complete(struct journal_entry *entry)
  * synchronous replication.
  */
 struct journal {
+	/** Asynchronous write */
+	int (*write_async)(struct journal *journal,
+			   struct journal_entry *entry);
+
+	/** Asynchronous write completion */
+	void (*write_async_cb)(struct journal_entry *entry);
+
+	/** Synchronous write */
 	int (*write)(struct journal *journal,
-		     struct journal_entry *req);
+		     struct journal_entry *entry);
+
+	/** Journal destroy */
 	void (*destroy)(struct journal *journal);
 };
 
+/**
+ * Finalize a single entry.
+ */
+static inline void
+journal_async_complete(struct journal *journal, struct journal_entry *entry)
+{
+	assert(journal->write_async_cb != NULL);
+	journal->write_async_cb(entry);
+}
+
 /**
  * Depending on the step of recovery and instance configuration
  * points at a concrete implementation of the journal.
@@ -124,9 +125,9 @@ struct journal {
 extern struct journal *current_journal;
 
 /**
- * Send a single entry to write.
+ * Write a single entry to the journal in synchronous way.
  *
- * @return 0 if write was scheduled or -1 in case of an error.
+ * @return 0 if write was processed by a backend or -1 in case of an error.
  */
 static inline int
 journal_write(struct journal_entry *entry)
@@ -134,6 +135,17 @@ journal_write(struct journal_entry *entry)
 	return current_journal->write(current_journal, entry);
 }
 
+/**
+ * Queue a single entry to the journal in asynchronous way.
+ *
+ * @return 0 if write was queued to a backend or -1 in case of an error.
+ */
+static inline int
+journal_write_async(struct journal_entry *entry)
+{
+	return current_journal->write_async(current_journal, entry);
+}
+
 /**
  * Change the current implementation of the journaling API.
  * Happens during life cycle of an instance:
@@ -165,13 +177,30 @@ journal_set(struct journal *new_journal)
 
 static inline void
 journal_create(struct journal *journal,
-	       int (*write)(struct journal *, struct journal_entry *),
-	       void (*destroy)(struct journal *))
+	       int (*write_async)(struct journal *journal,
+				  struct journal_entry *entry),
+	       void (*write_async_cb)(struct journal_entry *entry),
+	       int (*write)(struct journal *journal,
+			    struct journal_entry *entry),
+	       void (*destroy)(struct journal *journal))
 {
-	journal->write = write;
-	journal->destroy = destroy;
+	journal->write_async	= write_async;
+	journal->write_async_cb	= write_async_cb;
+	journal->write		= write;
+	journal->destroy	= destroy;
 }
 
+/**
+ * A stub to issue an error in case if asynchronous
+ * write is diabled in the backend.
+ */
+extern int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry);
+
+extern void
+journal_no_write_async_cb(struct journal_entry *entry);
+
 static inline bool
 journal_is_initialized(struct journal *journal)
 {
diff --git a/src/box/txn.c b/src/box/txn.c
index 11c20aceb..b42df3df6 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -463,9 +463,9 @@ txn_complete(struct txn *txn)
 }
 
 void
-txn_complete_async(struct journal_entry *entry, void *complete_data)
+txn_complete_async(struct journal_entry *entry)
 {
-	struct txn *txn = complete_data;
+	struct txn *txn = entry->complete_data;
 	txn->signature = entry->res;
 	/*
 	 * Some commit/rollback triggers require for in_txn fiber
@@ -487,7 +487,7 @@ txn_journal_entry_new(struct txn *txn)
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
 	req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows,
-				&txn->region, txn_complete_async, txn);
+				&txn->region, txn);
 	if (req == NULL)
 		return NULL;
 
@@ -518,24 +518,6 @@ txn_journal_entry_new(struct txn *txn)
 	return req;
 }
 
-static int64_t
-txn_write_to_wal(struct journal_entry *req)
-{
-	/*
-	 * Send the entry to the journal.
-	 *
-	 * After this point the transaction must not be used
-	 * so reset the corresponding key in the fiber storage.
-	 */
-	fiber_set_txn(fiber(), NULL);
-	if (journal_write(req) < 0) {
-		diag_set(ClientError, ER_WAL_IO);
-		diag_log();
-		return -1;
-	}
-	return 0;
-}
-
 /*
  * Prepare a transaction using engines.
  */
@@ -608,7 +590,17 @@ txn_commit_async(struct txn *txn)
 		return -1;
 	}
 
-	return txn_write_to_wal(req);
+	fiber_set_txn(fiber(), NULL);
+	if (journal_write_async(req) != 0) {
+		fiber_set_txn(fiber(), txn);
+		txn_rollback(txn);
+
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		return -1;
+	}
+
+	return 0;
 }
 
 int
@@ -636,21 +628,28 @@ txn_commit(struct txn *txn)
 		return -1;
 	}
 
-	if (txn_write_to_wal(req) != 0)
+	fiber_set_txn(fiber(), NULL);
+	if (journal_write(req) != 0) {
+		fiber_set_txn(fiber(), txn);
+		txn_rollback(txn);
+		txn_free(txn);
+
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
 		return -1;
+	}
 
-	/*
-	 * In case of non-yielding journal the transaction could already
-	 * be done and there is nothing to wait in such cases.
-	 */
 	if (!txn_has_flag(txn, TXN_IS_DONE)) {
-		bool cancellable = fiber_set_cancellable(false);
-		fiber_yield();
-		fiber_set_cancellable(cancellable);
+		txn->signature = req->res;
+		txn_complete(txn);
+		fiber_set_txn(fiber(), NULL);
 	}
+
 	int res = txn->signature >= 0 ? 0 : -1;
-	if (res != 0)
+	if (res != 0) {
 		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+	}
 
 	/* Synchronous transactions are freed by the calling fiber. */
 	txn_free(txn);
diff --git a/src/box/txn.h b/src/box/txn.h
index 572c76d84..3f6d79d5c 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -292,7 +292,7 @@ txn_rollback(struct txn *txn);
  * Complete asynchronous transaction.
  */
 void
-txn_complete_async(struct journal_entry *entry, void *complete_data);
+txn_complete_async(struct journal_entry *entry);
 
 /**
  * Submit a transaction to the journal.
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index cb291f3c8..9ead066af 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -815,8 +815,9 @@ vy_log_tx_flush(struct vy_log_tx *tx)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc,
-							NULL, NULL);
+
+	struct journal_entry *entry;
+	entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 1668c9348..3b094b0e8 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -60,11 +60,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
 int wal_dir_lock = -1;
 
+static int
+wal_write_async(struct journal *, struct journal_entry *);
+
 static int
 wal_write(struct journal *, struct journal_entry *);
 
 static int
-wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
+wal_write_none_async(struct journal *, struct journal_entry *);
+
+static int
+wal_write_none(struct journal *, struct journal_entry *);
 
 /*
  * WAL writer - maintain a Write Ahead Log for every change
@@ -253,9 +259,10 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 static void
 tx_schedule_queue(struct stailq *queue)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	struct journal_entry *req, *tmp;
 	stailq_foreach_entry_safe(req, tmp, queue, fifo)
-		journal_entry_complete(req);
+		journal_async_complete(&writer->base, req);
 }
 
 /**
@@ -342,6 +349,7 @@ tx_notify_checkpoint(struct cmsg *msg)
  */
 static void
 wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
+		  void (*wall_async_cb)(struct journal_entry *entry),
 		  const char *wal_dirname,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 		  wal_on_garbage_collection_f on_garbage_collection,
@@ -349,8 +357,14 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_size = wal_max_size;
-	journal_create(&writer->base, wal_mode == WAL_NONE ?
-		       wal_write_in_wal_mode_none : wal_write, NULL);
+
+	journal_create(&writer->base,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none_async : wal_write_async,
+		       wall_async_cb,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none : wal_write,
+		       NULL);
 
 	struct xlog_opts opts = xlog_opts_default;
 	opts.sync_is_async = true;
@@ -458,14 +472,14 @@ wal_open(struct wal_writer *writer)
 }
 
 int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
-	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
+	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	/* Initialize the state. */
 	struct wal_writer *writer = &wal_writer_singleton;
-	wal_writer_create(writer, wal_mode, wal_dirname,
+	wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
 			  wal_max_size, instance_uuid, on_garbage_collection,
 			  on_checkpoint_threshold);
 
@@ -1170,7 +1184,7 @@ wal_writer_f(va_list ap)
  * to be written to disk.
  */
 static int
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_write_async(struct journal *journal, struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 
@@ -1221,26 +1235,49 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 
 fail:
 	entry->res = -1;
-	journal_entry_complete(entry);
 	return -1;
 }
 
 static int
-wal_write_in_wal_mode_none(struct journal *journal,
-			   struct journal_entry *entry)
+wal_write(struct journal *journal, struct journal_entry *entry)
+{
+	/*
+	 * We can reuse async WAL engine transparently
+	 * to the caller.
+	 */
+	if (wal_write_async(journal, entry) != 0)
+		return -1;
+
+	bool cancellable = fiber_set_cancellable(false);
+	fiber_yield();
+	fiber_set_cancellable(cancellable);
+
+	return 0;
+}
+
+static int
+wal_write_none_async(struct journal *journal,
+		     struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	struct vclock vclock_diff;
+
 	vclock_create(&vclock_diff);
 	wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows,
 		       entry->rows + entry->n_rows);
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
-	journal_entry_complete(entry);
+
 	return 0;
 }
 
+static int
+wal_write_none(struct journal *journal, struct journal_entry *entry)
+{
+	return wal_write_none_async(journal, entry);
+}
+
 void
 wal_init_vy_log()
 {
diff --git a/src/box/wal.h b/src/box/wal.h
index 76b44941a..11a66a20a 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
  * Start WAL thread and initialize WAL writer.
  */
 int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
-	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
+	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
-- 
2.20.1



More information about the Tarantool-patches mailing list