Hi! Thanks for the patch! LGTM.   >Четверг, 20 августа 2020, 0:35 +03:00 от Cyrill Gorcunov : >  >In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned >wal journal operations such that asynchronous write completion >is a single instance per journal. > >It turned out that such simplification is too tight and doesn't >allow us to pass entries into the journal with custom completions. > >Thus lets allow back such ability. We will need it to be able >to write "confirm" records into wal directly without touching >transactions code at all. > >Part-of #5129 > >Signed-off-by: Cyrill Gorcunov < gorcunov@gmail.com > >--- > src/box/box.cc | 15 ++++++++------- > src/box/journal.c | 2 ++ > src/box/journal.h | 20 +++++++++++--------- > src/box/txn.c | 2 +- > src/box/vy_log.c | 2 +- > src/box/wal.c | 19 ++++++++----------- > src/box/wal.h | 4 ++-- > 7 files changed, 33 insertions(+), 31 deletions(-) > >diff --git a/src/box/box.cc b/src/box/box.cc >index 8e811e9c1..faffd5769 100644 >--- a/src/box/box.cc >+++ b/src/box/box.cc >@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base, >  * Since there're no actual writes, fire a >  * journal_async_complete callback right away. >  */ >- journal_async_complete(base, entry); >+ journal_async_complete(entry); >  return 0; > } >  >@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v) > { >  static struct recovery_journal journal; >  journal_create(&journal.base, recovery_journal_write, >- txn_complete_async, recovery_journal_write); >+ recovery_journal_write); >  journal.vclock = v; >  journal_set(&journal.base); > } >@@ -2182,8 +2182,10 @@ engine_init() > static int > bootstrap_journal_write(struct journal *base, struct journal_entry *entry) > { >+ (void)base; >+ >  entry->res = 0; >- journal_async_complete(base, entry); >+ journal_async_complete(entry); >  return 0; > } >  >@@ -2569,8 +2571,8 @@ box_cfg_xc(void) >  >  int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size")); >  enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode")); >- if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"), >- wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection, >+ if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size, >+ &INSTANCE_UUID, on_wal_garbage_collection, >  on_wal_checkpoint_threshold) != 0) { >  diag_raise(); >  } >@@ -2617,8 +2619,7 @@ box_cfg_xc(void) >  } >  >  struct journal bootstrap_journal; >- journal_create(&bootstrap_journal, NULL, txn_complete_async, >- bootstrap_journal_write); >+ journal_create(&bootstrap_journal, NULL, bootstrap_journal_write); >  journal_set(&bootstrap_journal); >  auto bootstrap_journal_guard = make_scoped_guard([] { >  journal_set(NULL); >diff --git a/src/box/journal.c b/src/box/journal.c >index f1e89aaa2..48af9157b 100644 >--- a/src/box/journal.c >+++ b/src/box/journal.c >@@ -36,6 +36,7 @@ struct journal *current_journal = NULL; >  > struct journal_entry * > journal_entry_new(size_t n_rows, struct region *region, >+ journal_write_async_f write_async_cb, >  void *complete_data) > { >  struct journal_entry *entry; >@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region, >  return NULL; >  } >  >+ entry->write_async_cb = write_async_cb; >  entry->complete_data = complete_data; >  entry->approx_len = 0; >  entry->n_rows = n_rows; >diff --git a/src/box/journal.h b/src/box/journal.h >index 1a10e66c3..4b019fecf 100644 >--- a/src/box/journal.h >+++ b/src/box/journal.h >@@ -42,6 +42,8 @@ extern "C" { > struct xrow_header; > struct journal_entry; >  >+typedef void (*journal_write_async_f)(struct journal_entry *entry); >+ > /** >  * An entry for an abstract journal. >  * Simply put, a write ahead log request. >@@ -61,6 +63,10 @@ struct journal_entry { >  * A journal entry completion callback argument. >  */ >  void *complete_data; >+ /** >+ * Asynchronous write completion function. >+ */ >+ journal_write_async_f write_async_cb; >  /** >  * Approximate size of this request when encoded. >  */ >@@ -84,6 +90,7 @@ struct region; >  */ > struct journal_entry * > journal_entry_new(size_t n_rows, struct region *region, >+ journal_write_async_f write_async_cb, >  void *complete_data); >  > /** >@@ -96,22 +103,19 @@ struct journal { >  int (*write_async)(struct journal *journal, >  struct journal_entry *entry); >  >- /** Asynchronous write completion */ >- void (*write_async_cb)(struct journal_entry *entry); >- >  /** Synchronous write */ >  int (*write)(struct journal *journal, >  struct journal_entry *entry); > }; >  > /** >- * Finalize a single entry. >+ * Complete asynchronous write. >  */ > static inline void >-journal_async_complete(struct journal *journal, struct journal_entry *entry) >+journal_async_complete(struct journal_entry *entry) > { >- assert(journal->write_async_cb != NULL); >- journal->write_async_cb(entry); >+ assert(entry->write_async_cb != NULL); >+ entry->write_async_cb(entry); > } >  > /** >@@ -173,12 +177,10 @@ static inline void > journal_create(struct journal *journal, >  int (*write_async)(struct journal *journal, >  struct journal_entry *entry), >- void (*write_async_cb)(struct journal_entry *entry), >  int (*write)(struct journal *journal, >  struct journal_entry *entry)) > { >  journal->write_async = write_async; >- journal->write_async_cb = write_async_cb; >  journal->write = write; > } >  >diff --git a/src/box/txn.c b/src/box/txn.c >index 9c21258c5..cc1f496c5 100644 >--- a/src/box/txn.c >+++ b/src/box/txn.c >@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn) >  >  /* Save space for an additional NOP row just in case. */ >  req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1, >- &txn->region, txn); >+ &txn->region, txn_complete_async, txn); >  if (req == NULL) >  return NULL; >  >diff --git a/src/box/vy_log.c b/src/box/vy_log.c >index 311985c72..de4c5205c 100644 >--- a/src/box/vy_log.c >+++ b/src/box/vy_log.c >@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx) >  size_t used = region_used(&fiber()->gc); >  >  struct journal_entry *entry; >- entry = journal_entry_new(tx_size, &fiber()->gc, NULL); >+ entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL); >  if (entry == NULL) >  goto err; >  >diff --git a/src/box/wal.c b/src/box/wal.c >index d8c92aa36..045006b60 100644 >--- a/src/box/wal.c >+++ b/src/box/wal.c >@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry) > static void > tx_schedule_queue(struct stailq *queue) > { >- struct wal_writer *writer = &wal_writer_singleton; >  struct journal_entry *req, *tmp; >  stailq_foreach_entry_safe(req, tmp, queue, fifo) >- journal_async_complete(&writer->base, req); >+ journal_async_complete(req); > } >  > /** >@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg) >  */ > static void > wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, >- void (*wall_async_cb)(struct journal_entry *entry), >- const char *wal_dirname, >- int64_t wal_max_size, const struct tt_uuid *instance_uuid, >+ const char *wal_dirname, int64_t wal_max_size, >+ const struct tt_uuid *instance_uuid, >  wal_on_garbage_collection_f on_garbage_collection, >  wal_on_checkpoint_threshold_f on_checkpoint_threshold) > { >@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, >  journal_create(&writer->base, >  wal_mode == WAL_NONE ? >  wal_write_none_async : wal_write_async, >- wall_async_cb, >  wal_mode == WAL_NONE ? >  wal_write_none : wal_write); >  >@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer) > } >  > int >-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry), >- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, >+wal_init(enum wal_mode wal_mode, const char *wal_dirname, >+ int64_t wal_max_size, const struct tt_uuid *instance_uuid, >  wal_on_garbage_collection_f on_garbage_collection, >  wal_on_checkpoint_threshold_f on_checkpoint_threshold) > { >  /* Initialize the state. */ >  struct wal_writer *writer = &wal_writer_singleton; >- wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname, >- wal_max_size, instance_uuid, on_garbage_collection, >+ wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size, >+ instance_uuid, on_garbage_collection, >  on_checkpoint_threshold); >  >  /* Start WAL thread. */ >@@ -1314,7 +1311,7 @@ wal_write_none_async(struct journal *journal, >  vclock_merge(&writer->vclock, &vclock_diff); >  vclock_copy(&replicaset.vclock, &writer->vclock); >  entry->res = vclock_sum(&writer->vclock); >- journal_async_complete(journal, entry); >+ journal_async_complete(entry); >  return 0; > } >  >diff --git a/src/box/wal.h b/src/box/wal.h >index f348dc636..9d0cada46 100644 >--- a/src/box/wal.h >+++ b/src/box/wal.h >@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void); >  * Start WAL thread and initialize WAL writer. >  */ > int >-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry), >- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, >+wal_init(enum wal_mode wal_mode, const char *wal_dirname, >+ int64_t wal_max_size, const struct tt_uuid *instance_uuid, >  wal_on_garbage_collection_f on_garbage_collection, >  wal_on_checkpoint_threshold_f on_checkpoint_threshold); >  >-- >2.26.2     -- Serge Petrenko