[Tarantool-patches] [PATCH v15 11/11] box/journal: redesign journal operations
Cyrill Gorcunov
gorcunov at gmail.com
Fri Mar 20 11:19:56 MSK 2020
Currently the journal provides only one method -- write,
which implies a callback to trigger upon write completion
(in contrary with 1.10 series where all commits were
processing in synchronous way).
Lets make difference between sync and async writes more
notable: provide journal::write_async method which runs
completion function once entry is written, in turn
journal:write handle transaction in synchronous way.
Redesing notes:
1) The callback for async write set once in journal
creation. There is no need to carry callback in
every journal entry. This allows us to save some
memory;
2) txn_commit and txn_commit_async call txn_rollback
where appropriate;
3) no need to call journal_entry_complete on sync
writes anymore;
4) wal_write_in_wal_mode_none is too long, renamed
to wal_write_none;
5) wal engine use async writes internally but it is
transparent to callers.
Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
src/box/box.cc | 8 ++---
src/box/journal.c | 33 +++++++++++++-----
src/box/journal.h | 86 +++++++++++++++++++++++++++++++----------------
src/box/txn.c | 61 +++++++++++++++++----------------
src/box/txn.h | 2 +-
src/box/vy_log.c | 5 +--
src/box/wal.c | 61 ++++++++++++++++++++++++++-------
src/box/wal.h | 4 +--
8 files changed, 171 insertions(+), 89 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index cf79affca..3a3bda78e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -322,7 +322,6 @@ recovery_journal_write(struct journal *base,
{
struct recovery_journal *journal = (struct recovery_journal *) base;
entry->res = vclock_sum(journal->vclock);
- journal_entry_complete(entry);
return 0;
}
@@ -330,8 +329,9 @@ static void
recovery_journal_create(struct vclock *v)
{
static struct recovery_journal journal;
-
- journal_create(&journal.base, recovery_journal_write, NULL);
+ journal_create(&journal.base, journal_no_write_async,
+ journal_no_write_async_cb,
+ recovery_journal_write, NULL);
journal.vclock = v;
journal_set(&journal.base);
}
@@ -2353,7 +2353,7 @@ box_cfg_xc(void)
int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
- if (wal_init(wal_mode, cfg_gets("wal_dir"),
+ if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
on_wal_checkpoint_threshold) != 0) {
diag_raise();
diff --git a/src/box/journal.c b/src/box/journal.c
index 11e78990d..b7535dc68 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -32,6 +32,24 @@
#include <small/region.h>
#include <diag.h>
+int
+journal_no_write_async(struct journal *journal,
+ struct journal_entry *entry)
+{
+ (void)journal;
+
+ assert(true);
+ entry->res = -1;
+ return -1;
+}
+
+void
+journal_no_write_async_cb(struct journal_entry *entry)
+{
+ assert(true);
+ entry->res = -1;
+}
+
/**
* Used to load from a memtx snapshot. LSN is not used,
* but txn_commit() must work.
@@ -41,21 +59,20 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
{
(void) journal;
entry->res = 0;
- journal_entry_complete(entry);
return 0;
}
static struct journal dummy_journal = {
- dummy_journal_write,
- NULL,
+ .write_async = journal_no_write_async,
+ .write_async_cb = journal_no_write_async_cb,
+ .write = dummy_journal_write,
};
struct journal *current_journal = &dummy_journal;
struct journal_entry *
journal_entry_new(size_t n_rows, struct region *region,
- journal_entry_complete_cb on_complete_cb,
- void *on_complete_cb_data)
+ void *complete_data)
{
struct journal_entry *entry;
@@ -68,11 +85,11 @@ journal_entry_new(size_t n_rows, struct region *region,
diag_set(OutOfMemory, size, "region", "struct journal_entry");
return NULL;
}
+
+ entry->complete_data = complete_data;
entry->approx_len = 0;
entry->n_rows = n_rows;
entry->res = -1;
- entry->on_complete_cb = on_complete_cb;
- entry->on_complete_cb_data = on_complete_cb_data;
+
return entry;
}
-
diff --git a/src/box/journal.h b/src/box/journal.h
index 64f167c6f..a3f3170ed 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -42,9 +42,6 @@ extern "C" {
struct xrow_header;
struct journal_entry;
-/** Journal entry finalization callback typedef. */
-typedef void (*journal_entry_complete_cb)(struct journal_entry *entry, void *data);
-
/**
* An entry for an abstract journal.
* Simply put, a write ahead log request.
@@ -60,17 +57,10 @@ struct journal_entry {
* the committed transaction, on error is -1
*/
int64_t res;
- /**
- * A journal entry finalization callback which is going to be called
- * after the entry processing was finished in both cases: success
- * or fail. Entry->res is set to a result value before the callback
- * is fired.
- */
- journal_entry_complete_cb on_complete_cb;
/**
* A journal entry completion callback argument.
*/
- void *on_complete_cb_data;
+ void *complete_data;
/**
* Approximate size of this request when encoded.
*/
@@ -94,17 +84,7 @@ struct region;
*/
struct journal_entry *
journal_entry_new(size_t n_rows, struct region *region,
- journal_entry_complete_cb on_complete_cb,
- void *on_complete_cb_data);
-
-/**
- * Finalize a single entry.
- */
-static inline void
-journal_entry_complete(struct journal_entry *entry)
-{
- entry->on_complete_cb(entry, entry->on_complete_cb_data);
-}
+ void *complete_data);
/**
* An API for an abstract journal for all transactions of this
@@ -112,11 +92,31 @@ journal_entry_complete(struct journal_entry *entry)
* synchronous replication.
*/
struct journal {
+ /** Asynchronous write */
+ int (*write_async)(struct journal *journal,
+ struct journal_entry *entry);
+
+ /** Asynchronous write completion */
+ void (*write_async_cb)(struct journal_entry *entry);
+
+ /** Synchronous write */
int (*write)(struct journal *journal,
- struct journal_entry *req);
+ struct journal_entry *entry);
+
+ /** Journal destroy */
void (*destroy)(struct journal *journal);
};
+/**
+ * Finalize a single entry.
+ */
+static inline void
+journal_async_complete(struct journal *journal, struct journal_entry *entry)
+{
+ assert(journal->write_async_cb != NULL);
+ journal->write_async_cb(entry);
+}
+
/**
* Depending on the step of recovery and instance configuration
* points at a concrete implementation of the journal.
@@ -124,9 +124,9 @@ struct journal {
extern struct journal *current_journal;
/**
- * Send a single entry to write.
+ * Write a single entry to the journal in synchronous way.
*
- * @return 0 if write was scheduled or -1 in case of an error.
+ * @return 0 if write was processed by a backend or -1 in case of an error.
*/
static inline int
journal_write(struct journal_entry *entry)
@@ -134,6 +134,17 @@ journal_write(struct journal_entry *entry)
return current_journal->write(current_journal, entry);
}
+/**
+ * Queue a single entry to the journal in asynchronous way.
+ *
+ * @return 0 if write was queued to a backend or -1 in case of an error.
+ */
+static inline int
+journal_write_async(struct journal_entry *entry)
+{
+ return current_journal->write_async(current_journal, entry);
+}
+
/**
* Change the current implementation of the journaling API.
* Happens during life cycle of an instance:
@@ -165,13 +176,30 @@ journal_set(struct journal *new_journal)
static inline void
journal_create(struct journal *journal,
- int (*write)(struct journal *, struct journal_entry *),
- void (*destroy)(struct journal *))
+ int (*write_async)(struct journal *journal,
+ struct journal_entry *entry),
+ void (*write_async_cb)(struct journal_entry *entry),
+ int (*write)(struct journal *journal,
+ struct journal_entry *entry),
+ void (*destroy)(struct journal *journal))
{
- journal->write = write;
- journal->destroy = destroy;
+ journal->write_async = write_async;
+ journal->write_async_cb = write_async_cb;
+ journal->write = write;
+ journal->destroy = destroy;
}
+/**
+ * A stub to issue an error in case if asynchronous
+ * write is diabled in the backend.
+ */
+extern int
+journal_no_write_async(struct journal *journal,
+ struct journal_entry *entry);
+
+extern void
+journal_no_write_async_cb(struct journal_entry *entry);
+
static inline bool
journal_is_initialized(struct journal *journal)
{
diff --git a/src/box/txn.c b/src/box/txn.c
index 11c20aceb..b42df3df6 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -463,9 +463,9 @@ txn_complete(struct txn *txn)
}
void
-txn_complete_async(struct journal_entry *entry, void *complete_data)
+txn_complete_async(struct journal_entry *entry)
{
- struct txn *txn = complete_data;
+ struct txn *txn = entry->complete_data;
txn->signature = entry->res;
/*
* Some commit/rollback triggers require for in_txn fiber
@@ -487,7 +487,7 @@ txn_journal_entry_new(struct txn *txn)
assert(txn->n_new_rows + txn->n_applier_rows > 0);
req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows,
- &txn->region, txn_complete_async, txn);
+ &txn->region, txn);
if (req == NULL)
return NULL;
@@ -518,24 +518,6 @@ txn_journal_entry_new(struct txn *txn)
return req;
}
-static int64_t
-txn_write_to_wal(struct journal_entry *req)
-{
- /*
- * Send the entry to the journal.
- *
- * After this point the transaction must not be used
- * so reset the corresponding key in the fiber storage.
- */
- fiber_set_txn(fiber(), NULL);
- if (journal_write(req) < 0) {
- diag_set(ClientError, ER_WAL_IO);
- diag_log();
- return -1;
- }
- return 0;
-}
-
/*
* Prepare a transaction using engines.
*/
@@ -608,7 +590,17 @@ txn_commit_async(struct txn *txn)
return -1;
}
- return txn_write_to_wal(req);
+ fiber_set_txn(fiber(), NULL);
+ if (journal_write_async(req) != 0) {
+ fiber_set_txn(fiber(), txn);
+ txn_rollback(txn);
+
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ return -1;
+ }
+
+ return 0;
}
int
@@ -636,21 +628,28 @@ txn_commit(struct txn *txn)
return -1;
}
- if (txn_write_to_wal(req) != 0)
+ fiber_set_txn(fiber(), NULL);
+ if (journal_write(req) != 0) {
+ fiber_set_txn(fiber(), txn);
+ txn_rollback(txn);
+ txn_free(txn);
+
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
return -1;
+ }
- /*
- * In case of non-yielding journal the transaction could already
- * be done and there is nothing to wait in such cases.
- */
if (!txn_has_flag(txn, TXN_IS_DONE)) {
- bool cancellable = fiber_set_cancellable(false);
- fiber_yield();
- fiber_set_cancellable(cancellable);
+ txn->signature = req->res;
+ txn_complete(txn);
+ fiber_set_txn(fiber(), NULL);
}
+
int res = txn->signature >= 0 ? 0 : -1;
- if (res != 0)
+ if (res != 0) {
diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ }
/* Synchronous transactions are freed by the calling fiber. */
txn_free(txn);
diff --git a/src/box/txn.h b/src/box/txn.h
index 572c76d84..3f6d79d5c 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -292,7 +292,7 @@ txn_rollback(struct txn *txn);
* Complete asynchronous transaction.
*/
void
-txn_complete_async(struct journal_entry *entry, void *complete_data);
+txn_complete_async(struct journal_entry *entry);
/**
* Submit a transaction to the journal.
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index cb291f3c8..9ead066af 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -815,8 +815,9 @@ vy_log_tx_flush(struct vy_log_tx *tx)
tx_size++;
size_t used = region_used(&fiber()->gc);
- struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc,
- NULL, NULL);
+
+ struct journal_entry *entry;
+ entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
if (entry == NULL)
goto err;
diff --git a/src/box/wal.c b/src/box/wal.c
index 1668c9348..3b094b0e8 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -60,11 +60,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
int wal_dir_lock = -1;
+static int
+wal_write_async(struct journal *, struct journal_entry *);
+
static int
wal_write(struct journal *, struct journal_entry *);
static int
-wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
+wal_write_none_async(struct journal *, struct journal_entry *);
+
+static int
+wal_write_none(struct journal *, struct journal_entry *);
/*
* WAL writer - maintain a Write Ahead Log for every change
@@ -253,9 +259,10 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
static void
tx_schedule_queue(struct stailq *queue)
{
+ struct wal_writer *writer = &wal_writer_singleton;
struct journal_entry *req, *tmp;
stailq_foreach_entry_safe(req, tmp, queue, fifo)
- journal_entry_complete(req);
+ journal_async_complete(&writer->base, req);
}
/**
@@ -342,6 +349,7 @@ tx_notify_checkpoint(struct cmsg *msg)
*/
static void
wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
+ void (*wall_async_cb)(struct journal_entry *entry),
const char *wal_dirname,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
@@ -349,8 +357,14 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
{
writer->wal_mode = wal_mode;
writer->wal_max_size = wal_max_size;
- journal_create(&writer->base, wal_mode == WAL_NONE ?
- wal_write_in_wal_mode_none : wal_write, NULL);
+
+ journal_create(&writer->base,
+ wal_mode == WAL_NONE ?
+ wal_write_none_async : wal_write_async,
+ wall_async_cb,
+ wal_mode == WAL_NONE ?
+ wal_write_none : wal_write,
+ NULL);
struct xlog_opts opts = xlog_opts_default;
opts.sync_is_async = true;
@@ -458,14 +472,14 @@ wal_open(struct wal_writer *writer)
}
int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
- int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
+ const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
/* Initialize the state. */
struct wal_writer *writer = &wal_writer_singleton;
- wal_writer_create(writer, wal_mode, wal_dirname,
+ wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
wal_max_size, instance_uuid, on_garbage_collection,
on_checkpoint_threshold);
@@ -1170,7 +1184,7 @@ wal_writer_f(va_list ap)
* to be written to disk.
*/
static int
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_write_async(struct journal *journal, struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
@@ -1221,26 +1235,49 @@ wal_write(struct journal *journal, struct journal_entry *entry)
fail:
entry->res = -1;
- journal_entry_complete(entry);
return -1;
}
static int
-wal_write_in_wal_mode_none(struct journal *journal,
- struct journal_entry *entry)
+wal_write(struct journal *journal, struct journal_entry *entry)
+{
+ /*
+ * We can reuse async WAL engine transparently
+ * to the caller.
+ */
+ if (wal_write_async(journal, entry) != 0)
+ return -1;
+
+ bool cancellable = fiber_set_cancellable(false);
+ fiber_yield();
+ fiber_set_cancellable(cancellable);
+
+ return 0;
+}
+
+static int
+wal_write_none_async(struct journal *journal,
+ struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
struct vclock vclock_diff;
+
vclock_create(&vclock_diff);
wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows,
entry->rows + entry->n_rows);
vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
entry->res = vclock_sum(&writer->vclock);
- journal_entry_complete(entry);
+
return 0;
}
+static int
+wal_write_none(struct journal *journal, struct journal_entry *entry)
+{
+ return wal_write_none_async(journal, entry);
+}
+
void
wal_init_vy_log()
{
diff --git a/src/box/wal.h b/src/box/wal.h
index 76b44941a..11a66a20a 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
* Start WAL thread and initialize WAL writer.
*/
int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
- int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
+ const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold);
--
2.20.1
More information about the Tarantool-patches
mailing list