[Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry
Serge Petrenko
sergepetrenko at tarantool.org
Fri Aug 21 10:48:14 MSK 2020
Hi! Thanks for the patch! LGTM.
>Четверг, 20 августа 2020, 0:35 +03:00 от Cyrill Gorcunov <gorcunov at gmail.com>:
>
>In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned
>wal journal operations such that asynchronous write completion
>is a single instance per journal.
>
>It turned out that such simplification is too tight and doesn't
>allow us to pass entries into the journal with custom completions.
>
>Thus lets allow back such ability. We will need it to be able
>to write "confirm" records into wal directly without touching
>transactions code at all.
>
>Part-of #5129
>
>Signed-off-by: Cyrill Gorcunov < gorcunov at gmail.com >
>---
> src/box/box.cc | 15 ++++++++-------
> src/box/journal.c | 2 ++
> src/box/journal.h | 20 +++++++++++---------
> src/box/txn.c | 2 +-
> src/box/vy_log.c | 2 +-
> src/box/wal.c | 19 ++++++++-----------
> src/box/wal.h | 4 ++--
> 7 files changed, 33 insertions(+), 31 deletions(-)
>
>diff --git a/src/box/box.cc b/src/box/box.cc
>index 8e811e9c1..faffd5769 100644
>--- a/src/box/box.cc
>+++ b/src/box/box.cc
>@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base,
> * Since there're no actual writes, fire a
> * journal_async_complete callback right away.
> */
>- journal_async_complete(base, entry);
>+ journal_async_complete(entry);
> return 0;
> }
>
>@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v)
> {
> static struct recovery_journal journal;
> journal_create(&journal.base, recovery_journal_write,
>- txn_complete_async, recovery_journal_write);
>+ recovery_journal_write);
> journal.vclock = v;
> journal_set(&journal.base);
> }
>@@ -2182,8 +2182,10 @@ engine_init()
> static int
> bootstrap_journal_write(struct journal *base, struct journal_entry *entry)
> {
>+ (void)base;
>+
> entry->res = 0;
>- journal_async_complete(base, entry);
>+ journal_async_complete(entry);
> return 0;
> }
>
>@@ -2569,8 +2571,8 @@ box_cfg_xc(void)
>
> int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
> enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
>- if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
>- wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
>+ if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,
>+ &INSTANCE_UUID, on_wal_garbage_collection,
> on_wal_checkpoint_threshold) != 0) {
> diag_raise();
> }
>@@ -2617,8 +2619,7 @@ box_cfg_xc(void)
> }
>
> struct journal bootstrap_journal;
>- journal_create(&bootstrap_journal, NULL, txn_complete_async,
>- bootstrap_journal_write);
>+ journal_create(&bootstrap_journal, NULL, bootstrap_journal_write);
> journal_set(&bootstrap_journal);
> auto bootstrap_journal_guard = make_scoped_guard([] {
> journal_set(NULL);
>diff --git a/src/box/journal.c b/src/box/journal.c
>index f1e89aaa2..48af9157b 100644
>--- a/src/box/journal.c
>+++ b/src/box/journal.c
>@@ -36,6 +36,7 @@ struct journal *current_journal = NULL;
>
> struct journal_entry *
> journal_entry_new(size_t n_rows, struct region *region,
>+ journal_write_async_f write_async_cb,
> void *complete_data)
> {
> struct journal_entry *entry;
>@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
> return NULL;
> }
>
>+ entry->write_async_cb = write_async_cb;
> entry->complete_data = complete_data;
> entry->approx_len = 0;
> entry->n_rows = n_rows;
>diff --git a/src/box/journal.h b/src/box/journal.h
>index 1a10e66c3..4b019fecf 100644
>--- a/src/box/journal.h
>+++ b/src/box/journal.h
>@@ -42,6 +42,8 @@ extern "C" {
> struct xrow_header;
> struct journal_entry;
>
>+typedef void (*journal_write_async_f)(struct journal_entry *entry);
>+
> /**
> * An entry for an abstract journal.
> * Simply put, a write ahead log request.
>@@ -61,6 +63,10 @@ struct journal_entry {
> * A journal entry completion callback argument.
> */
> void *complete_data;
>+ /**
>+ * Asynchronous write completion function.
>+ */
>+ journal_write_async_f write_async_cb;
> /**
> * Approximate size of this request when encoded.
> */
>@@ -84,6 +90,7 @@ struct region;
> */
> struct journal_entry *
> journal_entry_new(size_t n_rows, struct region *region,
>+ journal_write_async_f write_async_cb,
> void *complete_data);
>
> /**
>@@ -96,22 +103,19 @@ struct journal {
> int (*write_async)(struct journal *journal,
> struct journal_entry *entry);
>
>- /** Asynchronous write completion */
>- void (*write_async_cb)(struct journal_entry *entry);
>-
> /** Synchronous write */
> int (*write)(struct journal *journal,
> struct journal_entry *entry);
> };
>
> /**
>- * Finalize a single entry.
>+ * Complete asynchronous write.
> */
> static inline void
>-journal_async_complete(struct journal *journal, struct journal_entry *entry)
>+journal_async_complete(struct journal_entry *entry)
> {
>- assert(journal->write_async_cb != NULL);
>- journal->write_async_cb(entry);
>+ assert(entry->write_async_cb != NULL);
>+ entry->write_async_cb(entry);
> }
>
> /**
>@@ -173,12 +177,10 @@ static inline void
> journal_create(struct journal *journal,
> int (*write_async)(struct journal *journal,
> struct journal_entry *entry),
>- void (*write_async_cb)(struct journal_entry *entry),
> int (*write)(struct journal *journal,
> struct journal_entry *entry))
> {
> journal->write_async = write_async;
>- journal->write_async_cb = write_async_cb;
> journal->write = write;
> }
>
>diff --git a/src/box/txn.c b/src/box/txn.c
>index 9c21258c5..cc1f496c5 100644
>--- a/src/box/txn.c
>+++ b/src/box/txn.c
>@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn)
>
> /* Save space for an additional NOP row just in case. */
> req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,
>- &txn->region, txn);
>+ &txn->region, txn_complete_async, txn);
> if (req == NULL)
> return NULL;
>
>diff --git a/src/box/vy_log.c b/src/box/vy_log.c
>index 311985c72..de4c5205c 100644
>--- a/src/box/vy_log.c
>+++ b/src/box/vy_log.c
>@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx)
> size_t used = region_used(&fiber()->gc);
>
> struct journal_entry *entry;
>- entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
>+ entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
> if (entry == NULL)
> goto err;
>
>diff --git a/src/box/wal.c b/src/box/wal.c
>index d8c92aa36..045006b60 100644
>--- a/src/box/wal.c
>+++ b/src/box/wal.c
>@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
> static void
> tx_schedule_queue(struct stailq *queue)
> {
>- struct wal_writer *writer = &wal_writer_singleton;
> struct journal_entry *req, *tmp;
> stailq_foreach_entry_safe(req, tmp, queue, fifo)
>- journal_async_complete(&writer->base, req);
>+ journal_async_complete(req);
> }
>
> /**
>@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg)
> */
> static void
> wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
>- void (*wall_async_cb)(struct journal_entry *entry),
>- const char *wal_dirname,
>- int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>+ const char *wal_dirname, int64_t wal_max_size,
>+ const struct tt_uuid *instance_uuid,
> wal_on_garbage_collection_f on_garbage_collection,
> wal_on_checkpoint_threshold_f on_checkpoint_threshold)
> {
>@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
> journal_create(&writer->base,
> wal_mode == WAL_NONE ?
> wal_write_none_async : wal_write_async,
>- wall_async_cb,
> wal_mode == WAL_NONE ?
> wal_write_none : wal_write);
>
>@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer)
> }
>
> int
>-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
>- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
>+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
> wal_on_garbage_collection_f on_garbage_collection,
> wal_on_checkpoint_threshold_f on_checkpoint_threshold)
> {
> /* Initialize the state. */
> struct wal_writer *writer = &wal_writer_singleton;
>- wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
>- wal_max_size, instance_uuid, on_garbage_collection,
>+ wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size,
>+ instance_uuid, on_garbage_collection,
> on_checkpoint_threshold);
>
> /* Start WAL thread. */
>@@ -1314,7 +1311,7 @@ wal_write_none_async(struct journal *journal,
> vclock_merge(&writer->vclock, &vclock_diff);
> vclock_copy(&replicaset.vclock, &writer->vclock);
> entry->res = vclock_sum(&writer->vclock);
>- journal_async_complete(journal, entry);
>+ journal_async_complete(entry);
> return 0;
> }
>
>diff --git a/src/box/wal.h b/src/box/wal.h
>index f348dc636..9d0cada46 100644
>--- a/src/box/wal.h
>+++ b/src/box/wal.h
>@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
> * Start WAL thread and initialize WAL writer.
> */
> int
>-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
>- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
>+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
> wal_on_garbage_collection_f on_garbage_collection,
> wal_on_checkpoint_threshold_f on_checkpoint_threshold);
>
>--
>2.26.2
--
Serge Petrenko
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200821/6b4462a4/attachment.html>
More information about the Tarantool-patches
mailing list