[Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry

Serge Petrenko sergepetrenko at tarantool.org
Fri Aug 21 10:48:14 MSK 2020


Hi! Thanks for the patch! LGTM.
  
>Четверг, 20 августа 2020, 0:35 +03:00 от Cyrill Gorcunov <gorcunov at gmail.com>:
> 
>In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned
>wal journal operations such that asynchronous write completion
>is a single instance per journal.
>
>It turned out that such simplification is too tight and doesn't
>allow us to pass entries into the journal with custom completions.
>
>Thus lets allow back such ability. We will need it to be able
>to write "confirm" records into wal directly without touching
>transactions code at all.
>
>Part-of #5129
>
>Signed-off-by: Cyrill Gorcunov < gorcunov at gmail.com >
>---
> src/box/box.cc | 15 ++++++++-------
> src/box/journal.c | 2 ++
> src/box/journal.h | 20 +++++++++++---------
> src/box/txn.c | 2 +-
> src/box/vy_log.c | 2 +-
> src/box/wal.c | 19 ++++++++-----------
> src/box/wal.h | 4 ++--
> 7 files changed, 33 insertions(+), 31 deletions(-)
>
>diff --git a/src/box/box.cc b/src/box/box.cc
>index 8e811e9c1..faffd5769 100644
>--- a/src/box/box.cc
>+++ b/src/box/box.cc
>@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base,
>  * Since there're no actual writes, fire a
>  * journal_async_complete callback right away.
>  */
>- journal_async_complete(base, entry);
>+ journal_async_complete(entry);
>  return 0;
> }
> 
>@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v)
> {
>  static struct recovery_journal journal;
>  journal_create(&journal.base, recovery_journal_write,
>- txn_complete_async, recovery_journal_write);
>+ recovery_journal_write);
>  journal.vclock = v;
>  journal_set(&journal.base);
> }
>@@ -2182,8 +2182,10 @@ engine_init()
> static int
> bootstrap_journal_write(struct journal *base, struct journal_entry *entry)
> {
>+ (void)base;
>+
>  entry->res = 0;
>- journal_async_complete(base, entry);
>+ journal_async_complete(entry);
>  return 0;
> }
> 
>@@ -2569,8 +2571,8 @@ box_cfg_xc(void)
> 
>  int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
>  enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
>- if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
>- wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
>+ if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,
>+ &INSTANCE_UUID, on_wal_garbage_collection,
>  on_wal_checkpoint_threshold) != 0) {
>  diag_raise();
>  }
>@@ -2617,8 +2619,7 @@ box_cfg_xc(void)
>  }
> 
>  struct journal bootstrap_journal;
>- journal_create(&bootstrap_journal, NULL, txn_complete_async,
>- bootstrap_journal_write);
>+ journal_create(&bootstrap_journal, NULL, bootstrap_journal_write);
>  journal_set(&bootstrap_journal);
>  auto bootstrap_journal_guard = make_scoped_guard([] {
>  journal_set(NULL);
>diff --git a/src/box/journal.c b/src/box/journal.c
>index f1e89aaa2..48af9157b 100644
>--- a/src/box/journal.c
>+++ b/src/box/journal.c
>@@ -36,6 +36,7 @@ struct journal *current_journal = NULL;
> 
> struct journal_entry *
> journal_entry_new(size_t n_rows, struct region *region,
>+ journal_write_async_f write_async_cb,
>  void *complete_data)
> {
>  struct journal_entry *entry;
>@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
>  return NULL;
>  }
> 
>+ entry->write_async_cb = write_async_cb;
>  entry->complete_data = complete_data;
>  entry->approx_len = 0;
>  entry->n_rows = n_rows;
>diff --git a/src/box/journal.h b/src/box/journal.h
>index 1a10e66c3..4b019fecf 100644
>--- a/src/box/journal.h
>+++ b/src/box/journal.h
>@@ -42,6 +42,8 @@ extern "C" {
> struct xrow_header;
> struct journal_entry;
> 
>+typedef void (*journal_write_async_f)(struct journal_entry *entry);
>+
> /**
>  * An entry for an abstract journal.
>  * Simply put, a write ahead log request.
>@@ -61,6 +63,10 @@ struct journal_entry {
>  * A journal entry completion callback argument.
>  */
>  void *complete_data;
>+ /**
>+ * Asynchronous write completion function.
>+ */
>+ journal_write_async_f write_async_cb;
>  /**
>  * Approximate size of this request when encoded.
>  */
>@@ -84,6 +90,7 @@ struct region;
>  */
> struct journal_entry *
> journal_entry_new(size_t n_rows, struct region *region,
>+ journal_write_async_f write_async_cb,
>  void *complete_data);
> 
> /**
>@@ -96,22 +103,19 @@ struct journal {
>  int (*write_async)(struct journal *journal,
>  struct journal_entry *entry);
> 
>- /** Asynchronous write completion */
>- void (*write_async_cb)(struct journal_entry *entry);
>-
>  /** Synchronous write */
>  int (*write)(struct journal *journal,
>  struct journal_entry *entry);
> };
> 
> /**
>- * Finalize a single entry.
>+ * Complete asynchronous write.
>  */
> static inline void
>-journal_async_complete(struct journal *journal, struct journal_entry *entry)
>+journal_async_complete(struct journal_entry *entry)
> {
>- assert(journal->write_async_cb != NULL);
>- journal->write_async_cb(entry);
>+ assert(entry->write_async_cb != NULL);
>+ entry->write_async_cb(entry);
> }
> 
> /**
>@@ -173,12 +177,10 @@ static inline void
> journal_create(struct journal *journal,
>  int (*write_async)(struct journal *journal,
>  struct journal_entry *entry),
>- void (*write_async_cb)(struct journal_entry *entry),
>  int (*write)(struct journal *journal,
>  struct journal_entry *entry))
> {
>  journal->write_async = write_async;
>- journal->write_async_cb = write_async_cb;
>  journal->write = write;
> }
> 
>diff --git a/src/box/txn.c b/src/box/txn.c
>index 9c21258c5..cc1f496c5 100644
>--- a/src/box/txn.c
>+++ b/src/box/txn.c
>@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn)
> 
>  /* Save space for an additional NOP row just in case. */
>  req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,
>- &txn->region, txn);
>+ &txn->region, txn_complete_async, txn);
>  if (req == NULL)
>  return NULL;
> 
>diff --git a/src/box/vy_log.c b/src/box/vy_log.c
>index 311985c72..de4c5205c 100644
>--- a/src/box/vy_log.c
>+++ b/src/box/vy_log.c
>@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx)
>  size_t used = region_used(&fiber()->gc);
> 
>  struct journal_entry *entry;
>- entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
>+ entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
>  if (entry == NULL)
>  goto err;
> 
>diff --git a/src/box/wal.c b/src/box/wal.c
>index d8c92aa36..045006b60 100644
>--- a/src/box/wal.c
>+++ b/src/box/wal.c
>@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
> static void
> tx_schedule_queue(struct stailq *queue)
> {
>- struct wal_writer *writer = &wal_writer_singleton;
>  struct journal_entry *req, *tmp;
>  stailq_foreach_entry_safe(req, tmp, queue, fifo)
>- journal_async_complete(&writer->base, req);
>+ journal_async_complete(req);
> }
> 
> /**
>@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg)
>  */
> static void
> wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
>- void (*wall_async_cb)(struct journal_entry *entry),
>- const char *wal_dirname,
>- int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>+ const char *wal_dirname, int64_t wal_max_size,
>+ const struct tt_uuid *instance_uuid,
>  wal_on_garbage_collection_f on_garbage_collection,
>  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
> {
>@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
>  journal_create(&writer->base,
>  wal_mode == WAL_NONE ?
>  wal_write_none_async : wal_write_async,
>- wall_async_cb,
>  wal_mode == WAL_NONE ?
>  wal_write_none : wal_write);
> 
>@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer)
> }
> 
> int
>-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
>- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
>+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>  wal_on_garbage_collection_f on_garbage_collection,
>  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
> {
>  /* Initialize the state. */
>  struct wal_writer *writer = &wal_writer_singleton;
>- wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
>- wal_max_size, instance_uuid, on_garbage_collection,
>+ wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size,
>+ instance_uuid, on_garbage_collection,
>  on_checkpoint_threshold);
> 
>  /* Start WAL thread. */
>@@ -1314,7 +1311,7 @@ wal_write_none_async(struct journal *journal,
>  vclock_merge(&writer->vclock, &vclock_diff);
>  vclock_copy(&replicaset.vclock, &writer->vclock);
>  entry->res = vclock_sum(&writer->vclock);
>- journal_async_complete(journal, entry);
>+ journal_async_complete(entry);
>  return 0;
> }
> 
>diff --git a/src/box/wal.h b/src/box/wal.h
>index f348dc636..9d0cada46 100644
>--- a/src/box/wal.h
>+++ b/src/box/wal.h
>@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
>  * Start WAL thread and initialize WAL writer.
>  */
> int
>-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
>- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
>+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>  wal_on_garbage_collection_f on_garbage_collection,
>  wal_on_checkpoint_threshold_f on_checkpoint_threshold);
> 
>--
>2.26.2
 
 
--
Serge Petrenko  
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200821/6b4462a4/attachment.html>


More information about the Tarantool-patches mailing list