[Tarantool-patches] [PATCH 10/10] box/journal: redesign sync and async writes
Cyrill Gorcunov
gorcunov at gmail.com
Thu Mar 5 15:29:43 MSK 2020
Currently the journal provides only one method -- write,
which implies a callback to trigger upon write completion
(in contrary with 1.10 series where all commits were
processing in synchronous way).
Lets make difference between sync and async writes more
notable: provide journal::write_async method which takes
a callback and callback data as an argument, while
journal:write handle transaction in synchronous way.
Redesing notes:
1) Both journal_write and journal_write_async require
the caller to pass valid fiber->storage.txn, on
error the txn must not be reset but preserved so
callers would be able to run txn_rollback
2) txn_commit and txn_commit_async call txn_rollback
where appropriate
3) no need to call journal_entry_complete on sync
writes anymore, it is handled by txn_commit
by self
4) wal_write_in_wal_mode_none is too long, renamed
to wal_write_none
Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
src/box/box.cc | 4 +-
src/box/journal.c | 21 +++++++--
src/box/journal.h | 48 ++++++++++++++++++---
src/box/txn.c | 102 ++++++++++++++++++++++----------------------
src/box/wal.c | 106 ++++++++++++++++++++++++++++++++++++++++++----
5 files changed, 211 insertions(+), 70 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index eb5931e37..03510eef9 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -322,14 +322,14 @@ recovery_journal_write(struct journal *base,
{
struct recovery_journal *journal = (struct recovery_journal *) base;
entry->res = vclock_sum(journal->vclock);
- journal_entry_complete(entry);
return 0;
}
static inline void
recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
{
- journal_create(&journal->base, recovery_journal_write, NULL);
+ journal_create(&journal->base, journal_no_write_async,
+ recovery_journal_write, NULL);
journal->vclock = v;
journal_set(&journal->base);
}
diff --git a/src/box/journal.c b/src/box/journal.c
index 266ee5d1f..2f78a4bc4 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -32,6 +32,21 @@
#include <small/region.h>
#include <diag.h>
+int
+journal_no_write_async(struct journal *journal,
+ struct journal_entry *entry,
+ journal_entry_complete_cb on_complete_cb,
+ void *on_complete_cb_data)
+{
+ (void)journal;
+ (void)entry;
+ (void)on_complete_cb;
+ (void)on_complete_cb_data;
+
+ say_error("journal: write_async called from invalid context");
+ return -1;
+}
+
/**
* Used to load from a memtx snapshot. LSN is not used,
* but txn_commit() must work.
@@ -41,13 +56,12 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
{
(void) journal;
entry->res = 0;
- journal_entry_complete(entry);
return 0;
}
static struct journal dummy_journal = {
- dummy_journal_write,
- NULL,
+ .write_async = journal_no_write_async,
+ .write = dummy_journal_write,
};
struct journal *current_journal = &dummy_journal;
@@ -75,4 +89,3 @@ journal_entry_new(size_t n_rows, struct region *region)
return entry;
}
-
diff --git a/src/box/journal.h b/src/box/journal.h
index e74c69910..fac4d4e78 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -34,6 +34,7 @@
#include <stdbool.h>
#include "salad/stailq.h"
#include "fiber.h"
+#include "txn.h"
#if defined(__cplusplus)
extern "C" {
@@ -110,6 +111,10 @@ journal_entry_complete(struct journal_entry *entry)
* synchronous replication.
*/
struct journal {
+ int (*write_async)(struct journal *journal,
+ struct journal_entry *entry,
+ journal_entry_complete_cb on_complete_cb,
+ void *on_complete_cb_data);
int (*write)(struct journal *journal,
struct journal_entry *req);
void (*destroy)(struct journal *journal);
@@ -122,16 +127,33 @@ struct journal {
extern struct journal *current_journal;
/**
- * Send a single entry to write.
+ * Write a single entry to the journal in synchronous way.
*
- * @return 0 if write was scheduled or -1 in case of an error.
+ * @return 0 if write was processed by a backend or -1 in case of an error.
*/
static inline int
journal_write(struct journal_entry *entry)
{
+ assert(in_txn() != NULL);
return current_journal->write(current_journal, entry);
}
+/**
+ * Queue a single entry to the journal in asynchronous way.
+ *
+ * @return 0 if write was queued to a backend or -1 in case of an error.
+ */
+static inline int
+journal_write_async(struct journal_entry *entry,
+ journal_entry_complete_cb on_complete_cb,
+ void *on_complete_cb_data)
+{
+ assert(in_txn() != NULL);
+ return current_journal->write_async(current_journal, entry,
+ on_complete_cb,
+ on_complete_cb_data);
+}
+
/**
* Change the current implementation of the journaling API.
* Happens during life cycle of an instance:
@@ -163,17 +185,33 @@ journal_set(struct journal *new_journal)
static inline void
journal_create(struct journal *journal,
+ int (*write_async)(struct journal *journal,
+ struct journal_entry *entry,
+ journal_entry_complete_cb on_complete_cb,
+ void *on_complete_cb_data),
int (*write)(struct journal *, struct journal_entry *),
void (*destroy)(struct journal *))
{
- journal->write = write;
- journal->destroy = destroy;
+ journal->write_async = write_async,
+ journal->write = write;
+ journal->destroy = destroy;
}
+/**
+ * A stub to issue an error in case if asynchronous
+ * write is diabled in the backend.
+ */
+extern int
+journal_no_write_async(struct journal *journal,
+ struct journal_entry *entry,
+ journal_entry_complete_cb on_complete_cb,
+ void *on_complete_cb_data);
+
static inline bool
journal_is_initialized(struct journal *journal)
{
- return journal->write != NULL;
+ return journal->write != NULL &&
+ journal->write_async != NULL;
}
#if defined(__cplusplus)
diff --git a/src/box/txn.c b/src/box/txn.c
index 613da181b..27aa3d35e 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -463,7 +463,7 @@ txn_complete(struct txn *txn)
}
static void
-txn_entry_complete_cb(struct journal_entry *entry, void *data)
+txn_async_complete(struct journal_entry *entry, void *data)
{
struct txn *txn = data;
txn->signature = entry->res;
@@ -478,6 +478,10 @@ txn_entry_complete_cb(struct journal_entry *entry, void *data)
fiber_set_txn(fiber(), NULL);
}
+/**
+ * Allocate new journal entry with transaction
+ * data to write.
+ */
static struct journal_entry *
txn_journal_entry_new(struct txn *txn)
{
@@ -518,24 +522,6 @@ txn_journal_entry_new(struct txn *txn)
return req;
}
-static int64_t
-txn_write_to_wal(struct journal_entry *req)
-{
- /*
- * Send the entry to the journal.
- *
- * After this point the transaction must not be used
- * so reset the corresponding key in the fiber storage.
- */
- fiber_set_txn(fiber(), NULL);
- if (journal_write(req) < 0) {
- diag_set(ClientError, ER_WAL_IO);
- diag_log();
- return -1;
- }
- return 0;
-}
-
/*
* Prepare a transaction using engines.
*/
@@ -596,42 +582,51 @@ txn_commit_nop(struct txn *txn)
return false;
}
+/**
+ * Commit a transaction asynchronously, the
+ * completion is processed by a callback.
+ */
int
txn_commit_async(struct txn *txn)
{
struct journal_entry *req;
- if (txn_prepare(txn) != 0) {
- txn_rollback(txn);
- return -1;
- }
+ if (txn_prepare(txn) != 0)
+ goto out_rollback;
if (txn_commit_nop(txn))
return 0;
req = txn_journal_entry_new(txn);
- if (req == NULL) {
- txn_rollback(txn);
- return -1;
+ if (req == NULL)
+ goto out_rollback;
+
+ if (journal_write_async(req, txn_async_complete, txn) != 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ goto out_rollback;
}
- req->on_complete_cb = txn_entry_complete_cb;
- req->on_complete_cb_data = txn;
- return txn_write_to_wal(req);
+ return 0;
+
+out_rollback:
+ txn_rollback(txn);
+ return -1;
}
+/**
+ * Commit a transaction synchronously.
+ */
int
txn_commit(struct txn *txn)
{
struct journal_entry *req;
- int res = -1;
+ int res;
txn->fiber = fiber();
- if (txn_prepare(txn) != 0) {
- txn_rollback(txn);
- goto out;
- }
+ if (txn_prepare(txn) != 0)
+ goto out_rollback;
if (txn_commit_nop(txn)) {
res = 0;
@@ -639,33 +634,40 @@ txn_commit(struct txn *txn)
}
req = txn_journal_entry_new(txn);
- if (req == NULL) {
- txn_rollback(txn);
- goto out;
- }
- req->on_complete_cb = txn_entry_complete_cb;
- req->on_complete_cb_data = txn;
-
- if (txn_write_to_wal(req) != 0)
- return -1;
+ if (req == NULL)
+ goto out_rollback;
/*
- * In case of non-yielding journal the transaction could already
- * be done and there is nothing to wait in such cases.
+ * FIXME: Move error setup inside the
+ * journal engine itself. The ClientError
+ * here is too general.
*/
- if (!txn_has_flag(txn, TXN_IS_DONE)) {
- bool cancellable = fiber_set_cancellable(false);
- fiber_yield();
- fiber_set_cancellable(cancellable);
+
+ if (journal_write(req) != 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ goto out_rollback;
}
+
+ txn->signature = req->res;
res = txn->signature >= 0 ? 0 : -1;
- if (res != 0)
+ if (res != 0) {
diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ }
+ txn_complete(txn);
+ fiber_set_txn(fiber(), NULL);
out:
+
/* Synchronous transactions are freed by the calling fiber. */
txn_free(txn);
return res;
+
+out_rollback:
+ res = -1;
+ txn_rollback(txn);
+ goto out;
}
void
diff --git a/src/box/wal.c b/src/box/wal.c
index 1668c9348..dd9563f31 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -32,6 +32,7 @@
#include "vclock.h"
#include "fiber.h"
+#include "txn.h"
#include "fio.h"
#include "errinj.h"
#include "error.h"
@@ -60,11 +61,19 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
int wal_dir_lock = -1;
+static int
+wal_write_async(struct journal *, struct journal_entry *,
+ journal_entry_complete_cb, void *);
+
static int
wal_write(struct journal *, struct journal_entry *);
static int
-wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
+wal_write_none_async(struct journal *, struct journal_entry *,
+ journal_entry_complete_cb, void *);
+
+static int
+wal_write_none(struct journal *, struct journal_entry *);
/*
* WAL writer - maintain a Write Ahead Log for every change
@@ -349,8 +358,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
{
writer->wal_mode = wal_mode;
writer->wal_max_size = wal_max_size;
- journal_create(&writer->base, wal_mode == WAL_NONE ?
- wal_write_in_wal_mode_none : wal_write, NULL);
+ journal_create(&writer->base,
+ wal_mode == WAL_NONE ?
+ wal_write_none_async : wal_write_async,
+ wal_mode == WAL_NONE ?
+ wal_write_none : wal_write,
+ NULL);
struct xlog_opts opts = xlog_opts_default;
opts.sync_is_async = true;
@@ -1170,9 +1183,21 @@ wal_writer_f(va_list ap)
* to be written to disk.
*/
static int
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_write_async(struct journal *journal, struct journal_entry *entry,
+ journal_entry_complete_cb on_complete_cb,
+ void *on_complete_cb_data)
{
struct wal_writer *writer = (struct wal_writer *) journal;
+ struct txn *txn = in_txn();
+
+ /*
+ * After this point the transaction will
+ * live on its own and processed via callbacks,
+ * so reset the fiber storage.
+ */
+ entry->on_complete_cb = on_complete_cb;
+ entry->on_complete_cb_data = on_complete_cb_data;
+ fiber_set_txn(fiber(), NULL);
ERROR_INJECT(ERRINJ_WAL_IO, {
goto fail;
@@ -1220,27 +1245,90 @@ wal_write(struct journal *journal, struct journal_entry *entry)
return 0;
fail:
+ /*
+ * Don't forget to restore transaction
+ * in a fiber storage: the caller should
+ * be able to run a rollback procedure.
+ */
+ fiber_set_txn(fiber(), txn);
entry->res = -1;
- journal_entry_complete(entry);
+ txn->signature = -1;
return -1;
}
+static void
+wal_write_cb(struct journal_entry *entry, void *data)
+{
+ struct txn *txn = data;
+ (void)entry;
+
+ /*
+ * On synchronous write just wake up
+ * the waiter which will complete the
+ * transaction.
+ */
+ fiber_wakeup(txn->fiber);
+}
+
+/*
+ * Queue entry to write and wait until it processed.
+ */
static int
-wal_write_in_wal_mode_none(struct journal *journal,
- struct journal_entry *entry)
+wal_write(struct journal *journal, struct journal_entry *entry)
{
- struct wal_writer *writer = (struct wal_writer *) journal;
+ struct txn *txn = in_txn();
+
+ /*
+ * Lets reuse async WAL engine to shrink code a bit.
+ */
+ if (wal_write_async(journal, entry, wal_write_cb, txn) != 0)
+ return -1;
+
+ bool cancellable = fiber_set_cancellable(false);
+ fiber_yield();
+ fiber_set_cancellable(cancellable);
+
+ /*
+ * Unlike async write we preserve the transaction
+ * in a fiber storage where the caller should finish
+ * the transaction.
+ */
+ fiber_set_txn(fiber(), txn);
+ return 0;
+}
+
+static int
+wal_write_none_async(struct journal *journal,
+ struct journal_entry *entry,
+ journal_entry_complete_cb on_complete_cb,
+ void *on_complete_cb_data)
+{
+ struct wal_writer *writer = (struct wal_writer *)journal;
struct vclock vclock_diff;
+ struct txn *txn = in_txn();
+
+ (void)on_complete_cb;
+ (void)on_complete_cb_data;
+
+ fiber_set_txn(fiber(), NULL);
+
vclock_create(&vclock_diff);
wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows,
entry->rows + entry->n_rows);
vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
entry->res = vclock_sum(&writer->vclock);
- journal_entry_complete(entry);
+
+ txn->signature = entry->res;
return 0;
}
+static int
+wal_write_none(struct journal *journal, struct journal_entry *entry)
+{
+ return wal_write_none_async(journal, entry, NULL, NULL);
+}
+
void
wal_init_vy_log()
{
--
2.20.1
More information about the Tarantool-patches
mailing list