[Tarantool-patches] [PATCH 10/10] box/journal: redesign sync and async writes

Cyrill Gorcunov gorcunov at gmail.com
Thu Mar 5 15:29:43 MSK 2020


Currently the journal provides only one method -- write,
which implies a callback to trigger upon write completion
(in contrary with 1.10 series where all commits were
processing in synchronous way).

Lets make difference between sync and async writes more
notable: provide journal::write_async method which takes
a callback and callback data as an argument, while
journal:write handle transaction in synchronous way.

Redesing notes:

1) Both journal_write and journal_write_async require
   the caller to pass valid fiber->storage.txn, on
   error the txn must not be reset but preserved so
   callers would be able to run txn_rollback

2) txn_commit and txn_commit_async call txn_rollback
   where appropriate

3) no need to call journal_entry_complete on sync
   writes anymore, it is handled by txn_commit
   by self

4) wal_write_in_wal_mode_none is too long, renamed
   to wal_write_none

Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
 src/box/box.cc    |   4 +-
 src/box/journal.c |  21 +++++++--
 src/box/journal.h |  48 ++++++++++++++++++---
 src/box/txn.c     | 102 ++++++++++++++++++++++----------------------
 src/box/wal.c     | 106 ++++++++++++++++++++++++++++++++++++++++++----
 5 files changed, 211 insertions(+), 70 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index eb5931e37..03510eef9 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -322,14 +322,14 @@ recovery_journal_write(struct journal *base,
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
 	entry->res = vclock_sum(journal->vclock);
-	journal_entry_complete(entry);
 	return 0;
 }
 
 static inline void
 recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 {
-	journal_create(&journal->base, recovery_journal_write, NULL);
+	journal_create(&journal->base, journal_no_write_async,
+		       recovery_journal_write, NULL);
 	journal->vclock = v;
 	journal_set(&journal->base);
 }
diff --git a/src/box/journal.c b/src/box/journal.c
index 266ee5d1f..2f78a4bc4 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -32,6 +32,21 @@
 #include <small/region.h>
 #include <diag.h>
 
+int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry,
+		       journal_entry_complete_cb on_complete_cb,
+		       void *on_complete_cb_data)
+{
+	(void)journal;
+	(void)entry;
+	(void)on_complete_cb;
+	(void)on_complete_cb_data;
+
+	say_error("journal: write_async called from invalid context");
+	return -1;
+}
+
 /**
  * Used to load from a memtx snapshot. LSN is not used,
  * but txn_commit() must work.
@@ -41,13 +56,12 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
 	entry->res = 0;
-	journal_entry_complete(entry);
 	return 0;
 }
 
 static struct journal dummy_journal = {
-	dummy_journal_write,
-	NULL,
+	.write_async	= journal_no_write_async,
+	.write		= dummy_journal_write,
 };
 
 struct journal *current_journal = &dummy_journal;
@@ -75,4 +89,3 @@ journal_entry_new(size_t n_rows, struct region *region)
 
 	return entry;
 }
-
diff --git a/src/box/journal.h b/src/box/journal.h
index e74c69910..fac4d4e78 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -34,6 +34,7 @@
 #include <stdbool.h>
 #include "salad/stailq.h"
 #include "fiber.h"
+#include "txn.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -110,6 +111,10 @@ journal_entry_complete(struct journal_entry *entry)
  * synchronous replication.
  */
 struct journal {
+	int (*write_async)(struct journal *journal,
+			   struct journal_entry *entry,
+			   journal_entry_complete_cb on_complete_cb,
+			   void *on_complete_cb_data);
 	int (*write)(struct journal *journal,
 		     struct journal_entry *req);
 	void (*destroy)(struct journal *journal);
@@ -122,16 +127,33 @@ struct journal {
 extern struct journal *current_journal;
 
 /**
- * Send a single entry to write.
+ * Write a single entry to the journal in synchronous way.
  *
- * @return 0 if write was scheduled or -1 in case of an error.
+ * @return 0 if write was processed by a backend or -1 in case of an error.
  */
 static inline int
 journal_write(struct journal_entry *entry)
 {
+	assert(in_txn() != NULL);
 	return current_journal->write(current_journal, entry);
 }
 
+/**
+ * Queue a single entry to the journal in asynchronous way.
+ *
+ * @return 0 if write was queued to a backend or -1 in case of an error.
+ */
+static inline int
+journal_write_async(struct journal_entry *entry,
+		    journal_entry_complete_cb on_complete_cb,
+		    void *on_complete_cb_data)
+{
+	assert(in_txn() != NULL);
+	return current_journal->write_async(current_journal, entry,
+					    on_complete_cb,
+					    on_complete_cb_data);
+}
+
 /**
  * Change the current implementation of the journaling API.
  * Happens during life cycle of an instance:
@@ -163,17 +185,33 @@ journal_set(struct journal *new_journal)
 
 static inline void
 journal_create(struct journal *journal,
+	       int (*write_async)(struct journal *journal,
+				  struct journal_entry *entry,
+				  journal_entry_complete_cb on_complete_cb,
+				  void *on_complete_cb_data),
 	       int (*write)(struct journal *, struct journal_entry *),
 	       void (*destroy)(struct journal *))
 {
-	journal->write = write;
-	journal->destroy = destroy;
+	journal->write_async	= write_async,
+	journal->write		= write;
+	journal->destroy	= destroy;
 }
 
+/**
+ * A stub to issue an error in case if asynchronous
+ * write is diabled in the backend.
+ */
+extern int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry,
+		       journal_entry_complete_cb on_complete_cb,
+		       void *on_complete_cb_data);
+
 static inline bool
 journal_is_initialized(struct journal *journal)
 {
-	return journal->write != NULL;
+	return journal->write != NULL &&
+		journal->write_async != NULL;
 }
 
 #if defined(__cplusplus)
diff --git a/src/box/txn.c b/src/box/txn.c
index 613da181b..27aa3d35e 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -463,7 +463,7 @@ txn_complete(struct txn *txn)
 }
 
 static void
-txn_entry_complete_cb(struct journal_entry *entry, void *data)
+txn_async_complete(struct journal_entry *entry, void *data)
 {
 	struct txn *txn = data;
 	txn->signature = entry->res;
@@ -478,6 +478,10 @@ txn_entry_complete_cb(struct journal_entry *entry, void *data)
 	fiber_set_txn(fiber(), NULL);
 }
 
+/**
+ * Allocate new journal entry with transaction
+ * data to write.
+ */
 static struct journal_entry *
 txn_journal_entry_new(struct txn *txn)
 {
@@ -518,24 +522,6 @@ txn_journal_entry_new(struct txn *txn)
 	return req;
 }
 
-static int64_t
-txn_write_to_wal(struct journal_entry *req)
-{
-	/*
-	 * Send the entry to the journal.
-	*
-	 * After this point the transaction must not be used
-	 * so reset the corresponding key in the fiber storage.
-	 */
-	fiber_set_txn(fiber(), NULL);
-	if (journal_write(req) < 0) {
-		diag_set(ClientError, ER_WAL_IO);
-		diag_log();
-		return -1;
-	}
-	return 0;
-}
-
 /*
  * Prepare a transaction using engines.
  */
@@ -596,42 +582,51 @@ txn_commit_nop(struct txn *txn)
 	return false;
 }
 
+/**
+ * Commit a transaction asynchronously, the
+ * completion is processed by a callback.
+ */
 int
 txn_commit_async(struct txn *txn)
 {
 	struct journal_entry *req;
 
-	if (txn_prepare(txn) != 0) {
-		txn_rollback(txn);
-		return -1;
-	}
+	if (txn_prepare(txn) != 0)
+		goto out_rollback;
 
 	if (txn_commit_nop(txn))
 		return 0;
 
 	req = txn_journal_entry_new(txn);
-	if (req == NULL) {
-		txn_rollback(txn);
-		return -1;
+	if (req == NULL)
+		goto out_rollback;
+
+	if (journal_write_async(req, txn_async_complete, txn) != 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		goto out_rollback;
 	}
-	req->on_complete_cb = txn_entry_complete_cb;
-	req->on_complete_cb_data = txn;
 
-	return txn_write_to_wal(req);
+	return 0;
+
+out_rollback:
+	txn_rollback(txn);
+	return -1;
 }
 
+/**
+ * Commit a transaction synchronously.
+ */
 int
 txn_commit(struct txn *txn)
 {
 	struct journal_entry *req;
-	int res = -1;
+	int res;
 
 	txn->fiber = fiber();
 
-	if (txn_prepare(txn) != 0) {
-		txn_rollback(txn);
-		goto out;
-	}
+	if (txn_prepare(txn) != 0)
+		goto out_rollback;
 
 	if (txn_commit_nop(txn)) {
 		res = 0;
@@ -639,33 +634,40 @@ txn_commit(struct txn *txn)
 	}
 
 	req = txn_journal_entry_new(txn);
-	if (req == NULL) {
-		txn_rollback(txn);
-		goto out;
-	}
-	req->on_complete_cb = txn_entry_complete_cb;
-	req->on_complete_cb_data = txn;
-
-	if (txn_write_to_wal(req) != 0)
-		return -1;
+	if (req == NULL)
+		goto out_rollback;
 
 	/*
-	 * In case of non-yielding journal the transaction could already
-	 * be done and there is nothing to wait in such cases.
+	 * FIXME: Move error setup inside the
+	 * journal engine itself. The ClientError
+	 * here is too general.
 	 */
-	if (!txn_has_flag(txn, TXN_IS_DONE)) {
-		bool cancellable = fiber_set_cancellable(false);
-		fiber_yield();
-		fiber_set_cancellable(cancellable);
+
+	if (journal_write(req) != 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		goto out_rollback;
 	}
+
+	txn->signature = req->res;
 	res = txn->signature >= 0 ? 0 : -1;
-	if (res != 0)
+	if (res != 0) {
 		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+	}
 
+	txn_complete(txn);
+	fiber_set_txn(fiber(), NULL);
 out:
+
 	/* Synchronous transactions are freed by the calling fiber. */
 	txn_free(txn);
 	return res;
+
+out_rollback:
+	res = -1;
+	txn_rollback(txn);
+	goto out;
 }
 
 void
diff --git a/src/box/wal.c b/src/box/wal.c
index 1668c9348..dd9563f31 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -32,6 +32,7 @@
 
 #include "vclock.h"
 #include "fiber.h"
+#include "txn.h"
 #include "fio.h"
 #include "errinj.h"
 #include "error.h"
@@ -60,11 +61,19 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
 int wal_dir_lock = -1;
 
+static int
+wal_write_async(struct journal *, struct journal_entry *,
+		journal_entry_complete_cb, void *);
+
 static int
 wal_write(struct journal *, struct journal_entry *);
 
 static int
-wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
+wal_write_none_async(struct journal *, struct journal_entry *,
+		     journal_entry_complete_cb, void *);
+
+static int
+wal_write_none(struct journal *, struct journal_entry *);
 
 /*
  * WAL writer - maintain a Write Ahead Log for every change
@@ -349,8 +358,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_size = wal_max_size;
-	journal_create(&writer->base, wal_mode == WAL_NONE ?
-		       wal_write_in_wal_mode_none : wal_write, NULL);
+	journal_create(&writer->base,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none_async : wal_write_async,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none : wal_write,
+		       NULL);
 
 	struct xlog_opts opts = xlog_opts_default;
 	opts.sync_is_async = true;
@@ -1170,9 +1183,21 @@ wal_writer_f(va_list ap)
  * to be written to disk.
  */
 static int
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_write_async(struct journal *journal, struct journal_entry *entry,
+		journal_entry_complete_cb on_complete_cb,
+		void *on_complete_cb_data)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
+	struct txn *txn = in_txn();
+
+	/*
+	 * After this point the transaction will
+	 * live on its own and processed via callbacks,
+	 * so reset the fiber storage.
+	 */
+	entry->on_complete_cb = on_complete_cb;
+	entry->on_complete_cb_data = on_complete_cb_data;
+	fiber_set_txn(fiber(), NULL);
 
 	ERROR_INJECT(ERRINJ_WAL_IO, {
 		goto fail;
@@ -1220,27 +1245,90 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	return 0;
 
 fail:
+	/*
+	 * Don't forget to restore transaction
+	 * in a fiber storage: the caller should
+	 * be able to run a rollback procedure.
+	 */
+	fiber_set_txn(fiber(), txn);
 	entry->res = -1;
-	journal_entry_complete(entry);
+	txn->signature = -1;
 	return -1;
 }
 
+static void
+wal_write_cb(struct journal_entry *entry, void *data)
+{
+	struct txn *txn = data;
+	(void)entry;
+
+	/*
+	 * On synchronous write just wake up
+	 * the waiter which will complete the
+	 * transaction.
+	 */
+	fiber_wakeup(txn->fiber);
+}
+
+/*
+ * Queue entry to write and wait until it processed.
+ */
 static int
-wal_write_in_wal_mode_none(struct journal *journal,
-			   struct journal_entry *entry)
+wal_write(struct journal *journal, struct journal_entry *entry)
 {
-	struct wal_writer *writer = (struct wal_writer *) journal;
+	struct txn *txn = in_txn();
+
+	/*
+	 * Lets reuse async WAL engine to shrink code a bit.
+	 */
+	if (wal_write_async(journal, entry, wal_write_cb, txn) != 0)
+		return -1;
+
+	bool cancellable = fiber_set_cancellable(false);
+	fiber_yield();
+	fiber_set_cancellable(cancellable);
+
+	/*
+	 * Unlike async write we preserve the transaction
+	 * in a fiber storage where the caller should finish
+	 * the transaction.
+	 */
+	fiber_set_txn(fiber(), txn);
+	return 0;
+}
+
+static int
+wal_write_none_async(struct journal *journal,
+		     struct journal_entry *entry,
+		     journal_entry_complete_cb on_complete_cb,
+		     void *on_complete_cb_data)
+{
+	struct wal_writer *writer = (struct wal_writer *)journal;
 	struct vclock vclock_diff;
+	struct txn *txn = in_txn();
+
+	(void)on_complete_cb;
+	(void)on_complete_cb_data;
+
+	fiber_set_txn(fiber(), NULL);
+
 	vclock_create(&vclock_diff);
 	wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows,
 		       entry->rows + entry->n_rows);
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
-	journal_entry_complete(entry);
+
+	txn->signature = entry->res;
 	return 0;
 }
 
+static int
+wal_write_none(struct journal *journal, struct journal_entry *entry)
+{
+	return wal_write_none_async(journal, entry, NULL, NULL);
+}
+
 void
 wal_init_vy_log()
 {
-- 
2.20.1



More information about the Tarantool-patches mailing list