<HTML><BODY><div>Hi! Thanks for the patch! LGTM.</div><div> </div><blockquote style="border-left:1px solid #0857A6; margin:10px; padding:0 0 0 10px;">Четверг, 20 августа 2020, 0:35 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:<br> <div id=""><div class="js-helper js-readmsg-msg"><style type="text/css"></style><div><div id="style_15978729131088848239_BODY">In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned<br>wal journal operations such that asynchronous write completion<br>is a single instance per journal.<br><br>It turned out that such simplification is too tight and doesn't<br>allow us to pass entries into the journal with custom completions.<br><br>Thus lets allow back such ability. We will need it to be able<br>to write "confirm" records into wal directly without touching<br>transactions code at all.<br><br>Part-of #5129<br><br>Signed-off-by: Cyrill Gorcunov <<a href="/compose?To=gorcunov@gmail.com">gorcunov@gmail.com</a>><br>---<br> src/box/box.cc | 15 ++++++++-------<br> src/box/journal.c | 2 ++<br> src/box/journal.h | 20 +++++++++++---------<br> src/box/txn.c | 2 +-<br> src/box/vy_log.c | 2 +-<br> src/box/wal.c | 19 ++++++++-----------<br> src/box/wal.h | 4 ++--<br> 7 files changed, 33 insertions(+), 31 deletions(-)<br><br>diff --git a/src/box/box.cc b/src/box/box.cc<br>index 8e811e9c1..faffd5769 100644<br>--- a/src/box/box.cc<br>+++ b/src/box/box.cc<br>@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base,<br>  * Since there're no actual writes, fire a<br>  * journal_async_complete callback right away.<br>  */<br>- journal_async_complete(base, entry);<br>+ journal_async_complete(entry);<br>  return 0;<br> }<br> <br>@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v)<br> {<br>  static struct recovery_journal journal;<br>  journal_create(&journal.base, recovery_journal_write,<br>- txn_complete_async, recovery_journal_write);<br>+ recovery_journal_write);<br>  journal.vclock = v;<br>  journal_set(&journal.base);<br> }<br>@@ -2182,8 +2182,10 @@ engine_init()<br> static int<br> bootstrap_journal_write(struct journal *base, struct journal_entry *entry)<br> {<br>+ (void)base;<br>+<br>  entry->res = 0;<br>- journal_async_complete(base, entry);<br>+ journal_async_complete(entry);<br>  return 0;<br> }<br> <br>@@ -2569,8 +2571,8 @@ box_cfg_xc(void)<br> <br>  int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));<br>  enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));<br>- if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),<br>- wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,<br>+ if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,<br>+ &INSTANCE_UUID, on_wal_garbage_collection,<br>  on_wal_checkpoint_threshold) != 0) {<br>  diag_raise();<br>  }<br>@@ -2617,8 +2619,7 @@ box_cfg_xc(void)<br>  }<br> <br>  struct journal bootstrap_journal;<br>- journal_create(&bootstrap_journal, NULL, txn_complete_async,<br>- bootstrap_journal_write);<br>+ journal_create(&bootstrap_journal, NULL, bootstrap_journal_write);<br>  journal_set(&bootstrap_journal);<br>  auto bootstrap_journal_guard = make_scoped_guard([] {<br>  journal_set(NULL);<br>diff --git a/src/box/journal.c b/src/box/journal.c<br>index f1e89aaa2..48af9157b 100644<br>--- a/src/box/journal.c<br>+++ b/src/box/journal.c<br>@@ -36,6 +36,7 @@ struct journal *current_journal = NULL;<br> <br> struct journal_entry *<br> journal_entry_new(size_t n_rows, struct region *region,<br>+ journal_write_async_f write_async_cb,<br>  void *complete_data)<br> {<br>  struct journal_entry *entry;<br>@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,<br>  return NULL;<br>  }<br> <br>+ entry->write_async_cb = write_async_cb;<br>  entry->complete_data = complete_data;<br>  entry->approx_len = 0;<br>  entry->n_rows = n_rows;<br>diff --git a/src/box/journal.h b/src/box/journal.h<br>index 1a10e66c3..4b019fecf 100644<br>--- a/src/box/journal.h<br>+++ b/src/box/journal.h<br>@@ -42,6 +42,8 @@ extern "C" {<br> struct xrow_header;<br> struct journal_entry;<br> <br>+typedef void (*journal_write_async_f)(struct journal_entry *entry);<br>+<br> /**<br>  * An entry for an abstract journal.<br>  * Simply put, a write ahead log request.<br>@@ -61,6 +63,10 @@ struct journal_entry {<br>  * A journal entry completion callback argument.<br>  */<br>  void *complete_data;<br>+ /**<br>+ * Asynchronous write completion function.<br>+ */<br>+ journal_write_async_f write_async_cb;<br>  /**<br>  * Approximate size of this request when encoded.<br>  */<br>@@ -84,6 +90,7 @@ struct region;<br>  */<br> struct journal_entry *<br> journal_entry_new(size_t n_rows, struct region *region,<br>+ journal_write_async_f write_async_cb,<br>  void *complete_data);<br> <br> /**<br>@@ -96,22 +103,19 @@ struct journal {<br>  int (*write_async)(struct journal *journal,<br>  struct journal_entry *entry);<br> <br>- /** Asynchronous write completion */<br>- void (*write_async_cb)(struct journal_entry *entry);<br>-<br>  /** Synchronous write */<br>  int (*write)(struct journal *journal,<br>  struct journal_entry *entry);<br> };<br> <br> /**<br>- * Finalize a single entry.<br>+ * Complete asynchronous write.<br>  */<br> static inline void<br>-journal_async_complete(struct journal *journal, struct journal_entry *entry)<br>+journal_async_complete(struct journal_entry *entry)<br> {<br>- assert(journal->write_async_cb != NULL);<br>- journal->write_async_cb(entry);<br>+ assert(entry->write_async_cb != NULL);<br>+ entry->write_async_cb(entry);<br> }<br> <br> /**<br>@@ -173,12 +177,10 @@ static inline void<br> journal_create(struct journal *journal,<br>  int (*write_async)(struct journal *journal,<br>  struct journal_entry *entry),<br>- void (*write_async_cb)(struct journal_entry *entry),<br>  int (*write)(struct journal *journal,<br>  struct journal_entry *entry))<br> {<br>  journal->write_async = write_async;<br>- journal->write_async_cb = write_async_cb;<br>  journal->write = write;<br> }<br> <br>diff --git a/src/box/txn.c b/src/box/txn.c<br>index 9c21258c5..cc1f496c5 100644<br>--- a/src/box/txn.c<br>+++ b/src/box/txn.c<br>@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn)<br> <br>  /* Save space for an additional NOP row just in case. */<br>  req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,<br>- &txn->region, txn);<br>+ &txn->region, txn_complete_async, txn);<br>  if (req == NULL)<br>  return NULL;<br> <br>diff --git a/src/box/vy_log.c b/src/box/vy_log.c<br>index 311985c72..de4c5205c 100644<br>--- a/src/box/vy_log.c<br>+++ b/src/box/vy_log.c<br>@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx)<br>  size_t used = region_used(&fiber()->gc);<br> <br>  struct journal_entry *entry;<br>- entry = journal_entry_new(tx_size, &fiber()->gc, NULL);<br>+ entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);<br>  if (entry == NULL)<br>  goto err;<br> <br>diff --git a/src/box/wal.c b/src/box/wal.c<br>index d8c92aa36..045006b60 100644<br>--- a/src/box/wal.c<br>+++ b/src/box/wal.c<br>@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)<br> static void<br> tx_schedule_queue(struct stailq *queue)<br> {<br>- struct wal_writer *writer = &wal_writer_singleton;<br>  struct journal_entry *req, *tmp;<br>  stailq_foreach_entry_safe(req, tmp, queue, fifo)<br>- journal_async_complete(&writer->base, req);<br>+ journal_async_complete(req);<br> }<br> <br> /**<br>@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg)<br>  */<br> static void<br> wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,<br>- void (*wall_async_cb)(struct journal_entry *entry),<br>- const char *wal_dirname,<br>- int64_t wal_max_size, const struct tt_uuid *instance_uuid,<br>+ const char *wal_dirname, int64_t wal_max_size,<br>+ const struct tt_uuid *instance_uuid,<br>  wal_on_garbage_collection_f on_garbage_collection,<br>  wal_on_checkpoint_threshold_f on_checkpoint_threshold)<br> {<br>@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,<br>  journal_create(&writer->base,<br>  wal_mode == WAL_NONE ?<br>  wal_write_none_async : wal_write_async,<br>- wall_async_cb,<br>  wal_mode == WAL_NONE ?<br>  wal_write_none : wal_write);<br> <br>@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer)<br> }<br> <br> int<br>-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),<br>- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,<br>+wal_init(enum wal_mode wal_mode, const char *wal_dirname,<br>+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,<br>  wal_on_garbage_collection_f on_garbage_collection,<br>  wal_on_checkpoint_threshold_f on_checkpoint_threshold)<br> {<br>  /* Initialize the state. */<br>  struct wal_writer *writer = &wal_writer_singleton;<br>- wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,<br>- wal_max_size, instance_uuid, on_garbage_collection,<br>+ wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size,<br>+ instance_uuid, on_garbage_collection,<br>  on_checkpoint_threshold);<br> <br>  /* Start WAL thread. */<br>@@ -1314,7 +1311,7 @@ wal_write_none_async(struct journal *journal,<br>  vclock_merge(&writer->vclock, &vclock_diff);<br>  vclock_copy(&replicaset.vclock, &writer->vclock);<br>  entry->res = vclock_sum(&writer->vclock);<br>- journal_async_complete(journal, entry);<br>+ journal_async_complete(entry);<br>  return 0;<br> }<br> <br>diff --git a/src/box/wal.h b/src/box/wal.h<br>index f348dc636..9d0cada46 100644<br>--- a/src/box/wal.h<br>+++ b/src/box/wal.h<br>@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);<br>  * Start WAL thread and initialize WAL writer.<br>  */<br> int<br>-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),<br>- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,<br>+wal_init(enum wal_mode wal_mode, const char *wal_dirname,<br>+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,<br>  wal_on_garbage_collection_f on_garbage_collection,<br>  wal_on_checkpoint_threshold_f on_checkpoint_threshold);<br> <br>--<br>2.26.2</div></div></div></div></blockquote><div> </div><div><div> <div><div>--<br>Serge Petrenko<span id="cke_bm_149E" style="display: none;"> </span></div></div></div></div><div> </div></BODY></HTML>