From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lj1-f196.google.com (mail-lj1-f196.google.com [209.85.208.196]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id EA77E445321 for ; Thu, 23 Jul 2020 15:30:20 +0300 (MSK) Received: by mail-lj1-f196.google.com with SMTP id h19so6090237ljg.13 for ; Thu, 23 Jul 2020 05:30:20 -0700 (PDT) From: Cyrill Gorcunov Date: Thu, 23 Jul 2020 15:29:37 +0300 Message-Id: <20200723122942.196011-3-gorcunov@gmail.com> In-Reply-To: <20200723122942.196011-1-gorcunov@gmail.com> References: <20200723122942.196011-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 2/7] wal: bind asynchronous write completion to an entry List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tml Cc: Vladislav Shpilevoy In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned wal journal operations such that asynchronous write completion is a signle instance per journal. It turned out that such simplification is too tight and doesn't allow us to pass entries into the journal with custom completions. Thus lets allow back such ability. We will need it to be able to write "confirm" records into wal directly without touching trasactions code at all. Signed-off-by: Cyrill Gorcunov --- src/box/box.cc | 14 ++++++++------ src/box/journal.c | 2 ++ src/box/journal.h | 18 +++++++++--------- src/box/txn.c | 2 +- src/box/vy_log.c | 2 +- src/box/wal.c | 20 +++++++++----------- src/box/wal.h | 4 ++-- 7 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 83eef5d98..7d61f2ed2 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base, * Since there're no actual writes, fire a * journal_async_complete callback right away. */ - journal_async_complete(base, entry); + journal_async_complete(entry); return 0; } @@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v) { static struct recovery_journal journal; journal_create(&journal.base, recovery_journal_write, - txn_complete_async, recovery_journal_write); + recovery_journal_write); journal.vclock = v; journal_set(&journal.base); } @@ -2193,8 +2193,10 @@ engine_init() static int bootstrap_journal_write(struct journal *base, struct journal_entry *entry) { + (void)base; + entry->res = 0; - journal_async_complete(base, entry); + journal_async_complete(entry); return 0; } @@ -2580,8 +2582,8 @@ box_cfg_xc(void) int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size")); enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode")); - if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"), - wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection, + if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size, + &INSTANCE_UUID, on_wal_garbage_collection, on_wal_checkpoint_threshold) != 0) { diag_raise(); } @@ -2628,7 +2630,7 @@ box_cfg_xc(void) } struct journal bootstrap_journal; - journal_create(&bootstrap_journal, NULL, txn_complete_async, + journal_create(&bootstrap_journal, bootstrap_journal_write, bootstrap_journal_write); journal_set(&bootstrap_journal); auto bootstrap_journal_guard = make_scoped_guard([] { diff --git a/src/box/journal.c b/src/box/journal.c index f1e89aaa2..fb81acb39 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -36,6 +36,7 @@ struct journal *current_journal = NULL; struct journal_entry * journal_entry_new(size_t n_rows, struct region *region, + void (*write_async_cb)(struct journal_entry *entry), void *complete_data) { struct journal_entry *entry; @@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region, return NULL; } + entry->write_async_cb = write_async_cb; entry->complete_data = complete_data; entry->approx_len = 0; entry->n_rows = n_rows; diff --git a/src/box/journal.h b/src/box/journal.h index 9049a2ce0..74a5eb050 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -60,6 +60,10 @@ struct journal_entry { * A journal entry completion callback argument. */ void *complete_data; + /** + * Asynchronous write completion function. + */ + void (*write_async_cb)(struct journal_entry *entry); /** * Approximate size of this request when encoded. */ @@ -83,6 +87,7 @@ struct region; */ struct journal_entry * journal_entry_new(size_t n_rows, struct region *region, + void (*write_async_cb)(struct journal_entry *entry), void *complete_data); /** @@ -95,22 +100,19 @@ struct journal { int (*write_async)(struct journal *journal, struct journal_entry *entry); - /** Asynchronous write completion */ - void (*write_async_cb)(struct journal_entry *entry); - /** Synchronous write */ int (*write)(struct journal *journal, struct journal_entry *entry); }; /** - * Finalize a single entry. + * Complete asynchronous write. */ static inline void -journal_async_complete(struct journal *journal, struct journal_entry *entry) +journal_async_complete(struct journal_entry *entry) { - assert(journal->write_async_cb != NULL); - journal->write_async_cb(entry); + assert(entry->write_async_cb != NULL); + entry->write_async_cb(entry); } /** @@ -172,12 +174,10 @@ static inline void journal_create(struct journal *journal, int (*write_async)(struct journal *journal, struct journal_entry *entry), - void (*write_async_cb)(struct journal_entry *entry), int (*write)(struct journal *journal, struct journal_entry *entry)) { journal->write_async = write_async; - journal->write_async_cb = write_async_cb; journal->write = write; } diff --git a/src/box/txn.c b/src/box/txn.c index 9c21258c5..cc1f496c5 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn) /* Save space for an additional NOP row just in case. */ req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1, - &txn->region, txn); + &txn->region, txn_complete_async, txn); if (req == NULL) return NULL; diff --git a/src/box/vy_log.c b/src/box/vy_log.c index 311985c72..de4c5205c 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx) size_t used = region_used(&fiber()->gc); struct journal_entry *entry; - entry = journal_entry_new(tx_size, &fiber()->gc, NULL); + entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL); if (entry == NULL) goto err; diff --git a/src/box/wal.c b/src/box/wal.c index 37a8bd483..4e6025104 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry) static void tx_schedule_queue(struct stailq *queue) { - struct wal_writer *writer = &wal_writer_singleton; struct journal_entry *req, *tmp; stailq_foreach_entry_safe(req, tmp, queue, fifo) - journal_async_complete(&writer->base, req); + journal_async_complete(req); } /** @@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg) */ static void wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, - void (*wall_async_cb)(struct journal_entry *entry), - const char *wal_dirname, - int64_t wal_max_size, const struct tt_uuid *instance_uuid, + const char *wal_dirname, int64_t wal_max_size, + const struct tt_uuid *instance_uuid, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold) { @@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, journal_create(&writer->base, wal_mode == WAL_NONE ? wal_write_none_async : wal_write_async, - wall_async_cb, wal_mode == WAL_NONE ? wal_write_none : wal_write); @@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer) } int -wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry), - const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, +wal_init(enum wal_mode wal_mode, const char *wal_dirname, + int64_t wal_max_size, const struct tt_uuid *instance_uuid, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold) { /* Initialize the state. */ struct wal_writer *writer = &wal_writer_singleton; - wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname, - wal_max_size, instance_uuid, on_garbage_collection, + wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size, + instance_uuid, on_garbage_collection, on_checkpoint_threshold); /* Start WAL thread. */ @@ -1304,7 +1301,8 @@ wal_write_none_async(struct journal *journal, vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); - journal_async_complete(journal, entry); + + journal_async_complete(entry); return 0; } diff --git a/src/box/wal.h b/src/box/wal.h index f348dc636..9d0cada46 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void); * Start WAL thread and initialize WAL writer. */ int -wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry), - const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, +wal_init(enum wal_mode wal_mode, const char *wal_dirname, + int64_t wal_max_size, const struct tt_uuid *instance_uuid, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold); -- 2.26.2