* [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes @ 2020-03-19 9:05 Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here Cyrill Gorcunov ` (10 more replies) 0 siblings, 11 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml Kostya, take a look please, once time permit. I set your Acks where appropriate, please ping me if you disagree with something. https://gitlab.com/tarantool/tarantool/pipelines/127699966 branch gorcunov/gh-4031-txn_write_to_wal-13 Cyrill Gorcunov (11): box: recovery_journal_create -- set journal here box: recovery_journal -- declare it as static box/txn: move fiber_set_txn to header box/txn: rename txn_write to txn_commit_async box/txn: move setup of txn start to txn_prepare box/txn: add txn_commit_nop helper box/txn: rename txn_entry_complete_cb to txn_complete_async box/txn: unweave txn_commit from txn_commit_async box/txn: clear fiber storage right before journal write box/txn: move journal allocation into separate routine box/journal: redesign journal operations src/box/applier.cc | 2 +- src/box/box.cc | 21 ++++--- src/box/journal.c | 39 +++++++++--- src/box/journal.h | 94 ++++++++++++++++++++--------- src/box/txn.c | 144 +++++++++++++++++++++++++++++---------------- src/box/txn.h | 16 ++++- src/box/vy_log.c | 4 +- src/box/wal.c | 74 ++++++++++++++++++++--- 8 files changed, 282 insertions(+), 112 deletions(-) base-commit: 1f7e7aa2bf47445dffc713df336288676b927445 -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 02/11] box: recovery_journal -- declare it as static Cyrill Gorcunov ` (9 subsequent siblings) 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml Allows to eliminate code duplication. Acked-by: Konstantin Osipov <kostja.osipov@gmail.com> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/box.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 09dd67ab4..eb5931e37 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -331,6 +331,7 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v) { journal_create(&journal->base, recovery_journal_write, NULL); journal->vclock = v; + journal_set(&journal->base); } static void @@ -2055,7 +2056,6 @@ bootstrap_from_master(struct replica *master) engine_begin_final_recovery_xc(); struct recovery_journal journal; recovery_journal_create(&journal, &replicaset.vclock); - journal_set(&journal.base); if (!replication_anon) { applier_resume_to_state(applier, APPLIER_JOINED, @@ -2221,7 +2221,6 @@ local_recovery(const struct tt_uuid *instance_uuid, struct recovery_journal journal; recovery_journal_create(&journal, &recovery->vclock); - journal_set(&journal.base); /* * We explicitly request memtx to recover its -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 02/11] box: recovery_journal -- declare it as static 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 03/11] box/txn: move fiber_set_txn to header Cyrill Gorcunov ` (8 subsequent siblings) 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml There is no need for several instances of recovery journal variable. Lets make it statically allocated inside recovery_journal_create routine. And drop the inline annotation because there is absolutely no need for this routine being inline. Suggested-by: Konstantin Osipov <kostja.osipov@gmail.com> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/box.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index eb5931e37..cf79affca 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -326,12 +326,14 @@ recovery_journal_write(struct journal *base, return 0; } -static inline void -recovery_journal_create(struct recovery_journal *journal, struct vclock *v) +static void +recovery_journal_create(struct vclock *v) { - journal_create(&journal->base, recovery_journal_write, NULL); - journal->vclock = v; - journal_set(&journal->base); + static struct recovery_journal journal; + + journal_create(&journal.base, recovery_journal_write, NULL); + journal.vclock = v; + journal_set(&journal.base); } static void @@ -2054,8 +2056,7 @@ bootstrap_from_master(struct replica *master) * Process final data (WALs). */ engine_begin_final_recovery_xc(); - struct recovery_journal journal; - recovery_journal_create(&journal, &replicaset.vclock); + recovery_journal_create(&replicaset.vclock); if (!replication_anon) { applier_resume_to_state(applier, APPLIER_JOINED, @@ -2219,8 +2220,7 @@ local_recovery(const struct tt_uuid *instance_uuid, memtx = (struct memtx_engine *)engine_by_name("memtx"); assert(memtx != NULL); - struct recovery_journal journal; - recovery_journal_create(&journal, &recovery->vclock); + recovery_journal_create(&recovery->vclock); /* * We explicitly request memtx to recover its -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 03/11] box/txn: move fiber_set_txn to header 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 02/11] box: recovery_journal -- declare it as static Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 04/11] box/txn: rename txn_write to txn_commit_async Cyrill Gorcunov ` (7 subsequent siblings) 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml We will use it inside wal engine. Moreover we already have a "get" function in this header which is named in_txt(). Having both get/set in one place should be more consistent. Acked-by: Konstantin Osipov <kostja.osipov@gmail.com> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/txn.c | 6 ------ src/box/txn.h | 7 +++++++ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index a4ca48224..6799f6c4b 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -49,12 +49,6 @@ txn_on_yield(struct trigger *trigger, void *event); static void txn_run_rollback_triggers(struct txn *txn, struct rlist *triggers); -static inline void -fiber_set_txn(struct fiber *fiber, struct txn *txn) -{ - fiber->storage.txn = txn; -} - static int txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request) { diff --git a/src/box/txn.h b/src/box/txn.h index ae2c3a58f..7a7e52954 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -256,6 +256,13 @@ in_txn(void) return fiber()->storage.txn; } +/* Set to the current transaction (if any) */ +static inline void +fiber_set_txn(struct fiber *fiber, struct txn *txn) +{ + fiber->storage.txn = txn; +} + /** * Start a transaction explicitly. * @pre no transaction is active -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 04/11] box/txn: rename txn_write to txn_commit_async 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov ` (2 preceding siblings ...) 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 03/11] box/txn: move fiber_set_txn to header Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 05/11] box/txn: move setup of txn start to txn_prepare Cyrill Gorcunov ` (6 subsequent siblings) 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml To reflect the fact tha we're don't waiting for transaction to complete but rely on completion callback. Acked-by: Konstantin Osipov <kostja.osipov@gmail.com> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/applier.cc | 2 +- src/box/txn.c | 4 ++-- src/box/txn.h | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index ff40e03c6..8666a3a98 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -805,7 +805,7 @@ applier_apply_tx(struct stailq *rows) trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL); txn_on_commit(txn, on_commit); - if (txn_write(txn) < 0) + if (txn_commit_async(txn) < 0) goto fail; /* Transaction was sent to journal so promote vclock. */ diff --git a/src/box/txn.c b/src/box/txn.c index 6799f6c4b..9f61303ab 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -558,7 +558,7 @@ txn_prepare(struct txn *txn) } int -txn_write(struct txn *txn) +txn_commit_async(struct txn *txn) { if (txn_prepare(txn) != 0) { txn_rollback(txn); @@ -586,7 +586,7 @@ txn_commit(struct txn *txn) { txn->fiber = fiber(); - if (txn_write(txn) != 0) + if (txn_commit_async(txn) != 0) return -1; /* * In case of non-yielding journal the transaction could already diff --git a/src/box/txn.h b/src/box/txn.h index 7a7e52954..b950e2e18 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -300,7 +300,7 @@ txn_rollback(struct txn *txn); * freed. */ int -txn_write(struct txn *txn); +txn_commit_async(struct txn *txn); /** * Most txns don't have triggers, and txn objects -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 05/11] box/txn: move setup of txn start to txn_prepare 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov ` (3 preceding siblings ...) 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 04/11] box/txn: rename txn_write to txn_commit_async Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 06/11] box/txn: add txn_commit_nop helper Cyrill Gorcunov ` (5 subsequent siblings) 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml For unification sake, we will handle nop transactions via common helper for both sync and async cases. Acked-by: Konstantin Osipov <kostja.osipov@gmail.com> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/txn.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/box/txn.c b/src/box/txn.c index 9f61303ab..6bb1b06ed 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -554,6 +554,8 @@ txn_prepare(struct txn *txn) trigger_clear(&txn->fiber_on_stop); if (!txn_has_flag(txn, TXN_CAN_YIELD)) trigger_clear(&txn->fiber_on_yield); + + txn->start_tm = ev_monotonic_now(loop()); return 0; } @@ -569,7 +571,6 @@ txn_commit_async(struct txn *txn) * After this point the transaction must not be used * so reset the corresponding key in the fiber storage. */ - txn->start_tm = ev_monotonic_now(loop()); if (txn->n_new_rows + txn->n_applier_rows == 0) { /* Nothing to do. */ txn->signature = 0; -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 06/11] box/txn: add txn_commit_nop helper 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov ` (4 preceding siblings ...) 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 05/11] box/txn: move setup of txn start to txn_prepare Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 07/11] box/txn: rename txn_entry_complete_cb to txn_complete_async Cyrill Gorcunov ` (4 subsequent siblings) 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml To reuse in sync trancastion once journal redesign is complete. Acked-by: Konstantin Osipov <kostja.osipov@gmail.com> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/txn.c | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 6bb1b06ed..7a2a4877e 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -559,6 +559,22 @@ txn_prepare(struct txn *txn) return 0; } +/** + * Complete transaction early if it is barely nop. + */ +static bool +txn_commit_nop(struct txn *txn) +{ + if (txn->n_new_rows + txn->n_applier_rows == 0) { + txn->signature = 0; + txn_complete(txn); + fiber_set_txn(fiber(), NULL); + return true; + } + + return false; +} + int txn_commit_async(struct txn *txn) { @@ -567,17 +583,13 @@ txn_commit_async(struct txn *txn) return -1; } + if (txn_commit_nop(txn)) + return 0; + /* * After this point the transaction must not be used * so reset the corresponding key in the fiber storage. */ - if (txn->n_new_rows + txn->n_applier_rows == 0) { - /* Nothing to do. */ - txn->signature = 0; - txn_complete(txn); - fiber_set_txn(fiber(), NULL); - return 0; - } fiber_set_txn(fiber(), NULL); return txn_write_to_wal(txn); } -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 07/11] box/txn: rename txn_entry_complete_cb to txn_complete_async 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov ` (5 preceding siblings ...) 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 06/11] box/txn: add txn_commit_nop helper Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 08/11] box/txn: unweave txn_commit from txn_commit_async Cyrill Gorcunov ` (3 subsequent siblings) 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml We will use this function inside wal engine right after journal redesign is complete. Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/txn.c | 8 ++++---- src/box/txn.h | 7 +++++++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 7a2a4877e..60da5b564 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -462,10 +462,10 @@ txn_complete(struct txn *txn) } } -static void -txn_entry_complete_cb(struct journal_entry *entry, void *data) +void +txn_complete_async(struct journal_entry *entry, void *complete_data) { - struct txn *txn = data; + struct txn *txn = complete_data; txn->signature = entry->res; /* * Some commit/rollback triggers require for in_txn fiber @@ -487,7 +487,7 @@ txn_write_to_wal(struct txn *txn) struct journal_entry *req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, &txn->region, - txn_entry_complete_cb, + txn_complete_async, txn); if (req == NULL) { txn_rollback(txn); diff --git a/src/box/txn.h b/src/box/txn.h index b950e2e18..572c76d84 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -44,6 +44,7 @@ extern "C" { /** box statistics */ extern struct rmean *rmean_box; +struct journal_entry; struct engine; struct space; struct tuple; @@ -287,6 +288,12 @@ txn_commit(struct txn *txn); void txn_rollback(struct txn *txn); +/** + * Complete asynchronous transaction. + */ +void +txn_complete_async(struct journal_entry *entry, void *complete_data); + /** * Submit a transaction to the journal. * @pre txn == in_txn() -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 08/11] box/txn: unweave txn_commit from txn_commit_async 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov ` (6 preceding siblings ...) 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 07/11] box/txn: rename txn_entry_complete_cb to txn_complete_async Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 09/11] box/txn: clear fiber storage right before journal write Cyrill Gorcunov ` (2 subsequent siblings) 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml We gonna diverge sync and async code flow thus lets make txn_commit to not use txn_commit_async. Fixes #4031 Acked-by: Konstantin Osipov <kostja.osipov@gmail.com> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/txn.c | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/box/txn.c b/src/box/txn.c index 60da5b564..06043a2d8 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -599,8 +599,21 @@ txn_commit(struct txn *txn) { txn->fiber = fiber(); - if (txn_commit_async(txn) != 0) + if (txn_prepare(txn) != 0) { + txn_rollback(txn); + txn_free(txn); + return -1; + } + + if (txn_commit_nop(txn)) { + txn_free(txn); + return 0; + } + + fiber_set_txn(fiber(), NULL); + if (txn_write_to_wal(txn) != 0) return -1; + /* * In case of non-yielding journal the transaction could already * be done and there is nothing to wait in such cases. -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 09/11] box/txn: clear fiber storage right before journal write 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov ` (7 preceding siblings ...) 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 08/11] box/txn: unweave txn_commit from txn_commit_async Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 10/11] box/txn: move journal allocation into separate routine Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations Cyrill Gorcunov 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml Otherwise we won't be able to make a rollback in case of journal_entry_new allocation failure. Acked-by: Konstantin Osipov <kostja.osipov@gmail.com> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/txn.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 06043a2d8..4e23e9828 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -513,7 +513,13 @@ txn_write_to_wal(struct txn *txn) assert(remote_row == req->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); - /* Send the entry to the journal. */ + /* + * Send the entry to the journal. + * + * After this point the transaction must not be used + * so reset the corresponding key in the fiber storage. + */ + fiber_set_txn(fiber(), NULL); if (journal_write(req) < 0) { diag_set(ClientError, ER_WAL_IO); diag_log(); @@ -586,11 +592,6 @@ txn_commit_async(struct txn *txn) if (txn_commit_nop(txn)) return 0; - /* - * After this point the transaction must not be used - * so reset the corresponding key in the fiber storage. - */ - fiber_set_txn(fiber(), NULL); return txn_write_to_wal(txn); } @@ -610,7 +611,6 @@ txn_commit(struct txn *txn) return 0; } - fiber_set_txn(fiber(), NULL); if (txn_write_to_wal(txn) != 0) return -1; -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 10/11] box/txn: move journal allocation into separate routine 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov ` (8 preceding siblings ...) 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 09/11] box/txn: clear fiber storage right before journal write Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations Cyrill Gorcunov 10 siblings, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml This makes code easier to read and allows to reuse txn allocation in sync\async writes. Acked-by: Konstantin Osipov <kostja.osipov@gmail.com> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/txn.c | 57 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 4e23e9828..11c20aceb 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -478,41 +478,49 @@ txn_complete_async(struct journal_entry *entry, void *complete_data) fiber_set_txn(fiber(), NULL); } -static int64_t -txn_write_to_wal(struct txn *txn) +static struct journal_entry * +txn_journal_entry_new(struct txn *txn) { + struct journal_entry *req; + struct txn_stmt *stmt; + assert(txn->n_new_rows + txn->n_applier_rows > 0); - /* Prepare a journal entry. */ - struct journal_entry *req = journal_entry_new(txn->n_new_rows + - txn->n_applier_rows, - &txn->region, - txn_complete_async, - txn); - if (req == NULL) { - txn_rollback(txn); - return -1; - } + req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, + &txn->region, txn_complete_async, txn); + if (req == NULL) + return NULL; - struct txn_stmt *stmt; struct xrow_header **remote_row = req->rows; struct xrow_header **local_row = req->rows + txn->n_applier_rows; + stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->has_triggers) { txn_init_triggers(txn); rlist_splice(&txn->on_commit, &stmt->on_commit); } + + /* A read (e.g. select) request */ if (stmt->row == NULL) - continue; /* A read (e.g. select) request */ + continue; + if (stmt->row->replica_id == 0) *local_row++ = stmt->row; else *remote_row++ = stmt->row; + req->approx_len += xrow_approx_len(stmt->row); } + assert(remote_row == req->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); + return req; +} + +static int64_t +txn_write_to_wal(struct journal_entry *req) +{ /* * Send the entry to the journal. * @@ -584,6 +592,8 @@ txn_commit_nop(struct txn *txn) int txn_commit_async(struct txn *txn) { + struct journal_entry *req; + if (txn_prepare(txn) != 0) { txn_rollback(txn); return -1; @@ -592,12 +602,20 @@ txn_commit_async(struct txn *txn) if (txn_commit_nop(txn)) return 0; - return txn_write_to_wal(txn); + req = txn_journal_entry_new(txn); + if (req == NULL) { + txn_rollback(txn); + return -1; + } + + return txn_write_to_wal(req); } int txn_commit(struct txn *txn) { + struct journal_entry *req; + txn->fiber = fiber(); if (txn_prepare(txn) != 0) { @@ -611,7 +629,14 @@ txn_commit(struct txn *txn) return 0; } - if (txn_write_to_wal(txn) != 0) + req = txn_journal_entry_new(txn); + if (req == NULL) { + txn_rollback(txn); + txn_free(txn); + return -1; + } + + if (txn_write_to_wal(req) != 0) return -1; /* -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov ` (9 preceding siblings ...) 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 10/11] box/txn: move journal allocation into separate routine Cyrill Gorcunov @ 2020-03-19 9:05 ` Cyrill Gorcunov 2020-03-19 10:37 ` Konstantin Osipov 10 siblings, 1 reply; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 9:05 UTC (permalink / raw) To: tml Currently the journal provides only one method -- write, which implies a callback to trigger upon write completion (in contrary with 1.10 series where all commits were processing in synchronous way). Lets make difference between sync and async writes more notable: provide journal::write_async method which takes a callback data as an argument, while journal:write handle transaction in synchronous way. Redesing notes: 1) The callback for async write set once in journal creation. There is no need to carry callback in every journal entry. This allows us to save some memory 2) Both journal_write and journal_write_async require the caller to pass valid fiber->storage.txn, on error the txn must not be reset but preserved so callers would be able to run txn_rollback 3) txn_commit and txn_commit_async call txn_rollback where appropriate 4) no need to call journal_entry_complete on sync writes anymore 5) wal_write_in_wal_mode_none is too long, renamed to wal_write_none Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/box.cc | 6 +-- src/box/journal.c | 39 +++++++++++++++----- src/box/journal.h | 94 +++++++++++++++++++++++++++++++++-------------- src/box/txn.c | 53 ++++++++++++-------------- src/box/vy_log.c | 4 +- src/box/wal.c | 74 ++++++++++++++++++++++++++++++++----- 6 files changed, 191 insertions(+), 79 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index cf79affca..87ddbeb3a 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -322,7 +322,6 @@ recovery_journal_write(struct journal *base, { struct recovery_journal *journal = (struct recovery_journal *) base; entry->res = vclock_sum(journal->vclock); - journal_entry_complete(entry); return 0; } @@ -330,8 +329,9 @@ static void recovery_journal_create(struct vclock *v) { static struct recovery_journal journal; - - journal_create(&journal.base, recovery_journal_write, NULL); + journal_create(&journal.base, journal_no_write_async, + journal_no_write_async_cb, + recovery_journal_write, NULL); journal.vclock = v; journal_set(&journal.base); } diff --git a/src/box/journal.c b/src/box/journal.c index 11e78990d..036aad87a 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -32,6 +32,29 @@ #include <small/region.h> #include <diag.h> +int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry, + void *complete_data) +{ + (void)complete_data; + (void)journal; + + say_error("journal: write_async invalid context"); + entry->res = -1; + return -1; +} + +void +journal_no_write_async_cb(struct journal_entry *entry, + void *complete_data) +{ + (void)complete_data; + + say_error("journal: write_async_cb invalid context"); + entry->res = -1; +} + /** * Used to load from a memtx snapshot. LSN is not used, * but txn_commit() must work. @@ -41,21 +64,19 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry) { (void) journal; entry->res = 0; - journal_entry_complete(entry); return 0; } static struct journal dummy_journal = { - dummy_journal_write, - NULL, + .write_async = journal_no_write_async, + .write_async_cb = journal_no_write_async_cb, + .write = dummy_journal_write, }; struct journal *current_journal = &dummy_journal; struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region, - journal_entry_complete_cb on_complete_cb, - void *on_complete_cb_data) +journal_entry_new(size_t n_rows, struct region *region) { struct journal_entry *entry; @@ -68,11 +89,11 @@ journal_entry_new(size_t n_rows, struct region *region, diag_set(OutOfMemory, size, "region", "struct journal_entry"); return NULL; } + + entry->complete_data = NULL; entry->approx_len = 0; entry->n_rows = n_rows; entry->res = -1; - entry->on_complete_cb = on_complete_cb; - entry->on_complete_cb_data = on_complete_cb_data; + return entry; } - diff --git a/src/box/journal.h b/src/box/journal.h index 64f167c6f..e1947edd1 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -34,6 +34,7 @@ #include <stdbool.h> #include "salad/stailq.h" #include "fiber.h" +#include "txn.h" #if defined(__cplusplus) extern "C" { @@ -60,17 +61,10 @@ struct journal_entry { * the committed transaction, on error is -1 */ int64_t res; - /** - * A journal entry finalization callback which is going to be called - * after the entry processing was finished in both cases: success - * or fail. Entry->res is set to a result value before the callback - * is fired. - */ - journal_entry_complete_cb on_complete_cb; /** * A journal entry completion callback argument. */ - void *on_complete_cb_data; + void *complete_data; /** * Approximate size of this request when encoded. */ @@ -93,18 +87,7 @@ struct region; * @return NULL if out of memory, fiber diagnostics area is set */ struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region, - journal_entry_complete_cb on_complete_cb, - void *on_complete_cb_data); - -/** - * Finalize a single entry. - */ -static inline void -journal_entry_complete(struct journal_entry *entry) -{ - entry->on_complete_cb(entry, entry->on_complete_cb_data); -} +journal_entry_new(size_t n_rows, struct region *region); /** * An API for an abstract journal for all transactions of this @@ -112,11 +95,33 @@ journal_entry_complete(struct journal_entry *entry) * synchronous replication. */ struct journal { + /** Asynchronous write */ + int (*write_async)(struct journal *journal, + struct journal_entry *entry, + void *complete_data); + + /** Asynchronous write completion */ + void (*write_async_cb)(struct journal_entry *entry, + void *complete_data); + + /** Synchronous write */ int (*write)(struct journal *journal, - struct journal_entry *req); + struct journal_entry *entry); + + /** Journal destroy */ void (*destroy)(struct journal *journal); }; +/** + * Finalize a single entry. + */ +static inline void +journal_async_complete(struct journal *journal, struct journal_entry *entry) +{ + assert(journal->write_async_cb != NULL); + journal->write_async_cb(entry, entry->complete_data); +} + /** * Depending on the step of recovery and instance configuration * points at a concrete implementation of the journal. @@ -124,16 +129,30 @@ struct journal { extern struct journal *current_journal; /** - * Send a single entry to write. + * Write a single entry to the journal in synchronous way. * - * @return 0 if write was scheduled or -1 in case of an error. + * @return 0 if write was processed by a backend or -1 in case of an error. */ static inline int journal_write(struct journal_entry *entry) { + assert(in_txn() != NULL); return current_journal->write(current_journal, entry); } +/** + * Queue a single entry to the journal in asynchronous way. + * + * @return 0 if write was queued to a backend or -1 in case of an error. + */ +static inline int +journal_write_async(struct journal_entry *entry, void *complete_data) +{ + assert(in_txn() != NULL); + return current_journal->write_async(current_journal, entry, + complete_data); +} + /** * Change the current implementation of the journaling API. * Happens during life cycle of an instance: @@ -165,13 +184,34 @@ journal_set(struct journal *new_journal) static inline void journal_create(struct journal *journal, - int (*write)(struct journal *, struct journal_entry *), - void (*destroy)(struct journal *)) + int (*write_async)(struct journal *journal, + struct journal_entry *entry, + void *complete_data), + void (*write_async_cb)(struct journal_entry *entry, + void *complete_data), + int (*write)(struct journal *journal, + struct journal_entry *entry), + void (*destroy)(struct journal *journal)) { - journal->write = write; - journal->destroy = destroy; + journal->write_async = write_async; + journal->write_async_cb = write_async_cb; + journal->write = write; + journal->destroy = destroy; } +/** + * A stub to issue an error in case if asynchronous + * write is diabled in the backend. + */ +extern int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry, + void *complete_data); + +extern void +journal_no_write_async_cb(struct journal_entry *entry, + void *complete_data); + static inline bool journal_is_initialized(struct journal *journal) { diff --git a/src/box/txn.c b/src/box/txn.c index 11c20aceb..33302586d 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -487,7 +487,7 @@ txn_journal_entry_new(struct txn *txn) assert(txn->n_new_rows + txn->n_applier_rows > 0); req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, - &txn->region, txn_complete_async, txn); + &txn->region); if (req == NULL) return NULL; @@ -518,24 +518,6 @@ txn_journal_entry_new(struct txn *txn) return req; } -static int64_t -txn_write_to_wal(struct journal_entry *req) -{ - /* - * Send the entry to the journal. - * - * After this point the transaction must not be used - * so reset the corresponding key in the fiber storage. - */ - fiber_set_txn(fiber(), NULL); - if (journal_write(req) < 0) { - diag_set(ClientError, ER_WAL_IO); - diag_log(); - return -1; - } - return 0; -} - /* * Prepare a transaction using engines. */ @@ -608,7 +590,15 @@ txn_commit_async(struct txn *txn) return -1; } - return txn_write_to_wal(req); + if (journal_write_async(req, txn) != 0) { + txn_rollback(txn); + + diag_set(ClientError, ER_WAL_IO); + diag_log(); + return -1; + } + + return 0; } int @@ -636,21 +626,26 @@ txn_commit(struct txn *txn) return -1; } - if (txn_write_to_wal(req) != 0) + if (journal_write(req) != 0) { + txn_rollback(txn); + txn_free(txn); + + diag_set(ClientError, ER_WAL_IO); + diag_log(); return -1; + } - /* - * In case of non-yielding journal the transaction could already - * be done and there is nothing to wait in such cases. - */ if (!txn_has_flag(txn, TXN_IS_DONE)) { - bool cancellable = fiber_set_cancellable(false); - fiber_yield(); - fiber_set_cancellable(cancellable); + txn->signature = req->res; + txn_complete(txn); + fiber_set_txn(fiber(), NULL); } + int res = txn->signature >= 0 ? 0 : -1; - if (res != 0) + if (res != 0) { diag_set(ClientError, ER_WAL_IO); + diag_log(); + } /* Synchronous transactions are freed by the calling fiber. */ txn_free(txn); diff --git a/src/box/vy_log.c b/src/box/vy_log.c index cb291f3c8..92171ec21 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -815,8 +815,8 @@ vy_log_tx_flush(struct vy_log_tx *tx) tx_size++; size_t used = region_used(&fiber()->gc); - struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, - NULL, NULL); + + struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc); if (entry == NULL) goto err; diff --git a/src/box/wal.c b/src/box/wal.c index 1668c9348..ba9f22e7a 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -32,6 +32,7 @@ #include "vclock.h" #include "fiber.h" +#include "txn.h" #include "fio.h" #include "errinj.h" #include "error.h" @@ -60,11 +61,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; int wal_dir_lock = -1; +static int +wal_write_async(struct journal *, struct journal_entry *, void *); + static int wal_write(struct journal *, struct journal_entry *); static int -wal_write_in_wal_mode_none(struct journal *, struct journal_entry *); +wal_write_none_async(struct journal *, struct journal_entry *, void *); + +static int +wal_write_none(struct journal *, struct journal_entry *); /* * WAL writer - maintain a Write Ahead Log for every change @@ -253,9 +260,10 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry) static void tx_schedule_queue(struct stailq *queue) { + struct wal_writer *writer = &wal_writer_singleton; struct journal_entry *req, *tmp; stailq_foreach_entry_safe(req, tmp, queue, fifo) - journal_entry_complete(req); + journal_async_complete(&writer->base, req); } /** @@ -349,8 +357,14 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, { writer->wal_mode = wal_mode; writer->wal_max_size = wal_max_size; - journal_create(&writer->base, wal_mode == WAL_NONE ? - wal_write_in_wal_mode_none : wal_write, NULL); + + journal_create(&writer->base, + wal_mode == WAL_NONE ? + wal_write_none_async : wal_write_async, + txn_complete_async, + wal_mode == WAL_NONE ? + wal_write_none : wal_write, + NULL); struct xlog_opts opts = xlog_opts_default; opts.sync_is_async = true; @@ -1170,9 +1184,18 @@ wal_writer_f(va_list ap) * to be written to disk. */ static int -wal_write(struct journal *journal, struct journal_entry *entry) +wal_write_async(struct journal *journal, struct journal_entry *entry, + void *complete_data) { struct wal_writer *writer = (struct wal_writer *) journal; + entry->complete_data = complete_data; + struct txn *txn = complete_data; + + /* + * After this point the transaction should not + * be bound to the fiber, it handled by a callback. + */ + fiber_set_txn(fiber(), NULL); ERROR_INJECT(ERRINJ_WAL_IO, { goto fail; @@ -1220,14 +1243,39 @@ wal_write(struct journal *journal, struct journal_entry *entry) return 0; fail: + /* + * Don't forget to restore transaction + * in a fiber storage: the caller should + * be able to run a rollback procedure. + */ + fiber_set_txn(fiber(), txn); entry->res = -1; - journal_entry_complete(entry); return -1; } static int -wal_write_in_wal_mode_none(struct journal *journal, - struct journal_entry *entry) +wal_write(struct journal *journal, struct journal_entry *entry) +{ + struct txn *txn = in_txn(); + + /* + * We can reuse async WAL engine transparently + * to the caller. + */ + if (wal_write_async(journal, entry, txn) != 0) + return -1; + + bool cancellable = fiber_set_cancellable(false); + fiber_yield(); + fiber_set_cancellable(cancellable); + + return 0; +} + +static int +wal_write_none_async(struct journal *journal, + struct journal_entry *entry, + void *complete_data) { struct wal_writer *writer = (struct wal_writer *) journal; struct vclock vclock_diff; @@ -1237,10 +1285,18 @@ wal_write_in_wal_mode_none(struct journal *journal, vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); - journal_entry_complete(entry); + + (void)complete_data; + fiber_set_txn(fiber(), NULL); return 0; } +static int +wal_write_none(struct journal *journal, struct journal_entry *entry) +{ + return wal_write_none_async(journal, entry, NULL); +} + void wal_init_vy_log() { -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations Cyrill Gorcunov @ 2020-03-19 10:37 ` Konstantin Osipov 2020-03-19 10:49 ` Cyrill Gorcunov 0 siblings, 1 reply; 17+ messages in thread From: Konstantin Osipov @ 2020-03-19 10:37 UTC (permalink / raw) To: Cyrill Gorcunov; +Cc: tml * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 12:12]: > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -32,6 +32,7 @@ > > #include "vclock.h" > #include "fiber.h" > +#include "txn.h" Please try to avoid including txn.h here. You can do it, I'm pretty sure. > #include "fio.h" > #include "errinj.h" > #include "error.h" > @@ -60,11 +61,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; > -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations 2020-03-19 10:37 ` Konstantin Osipov @ 2020-03-19 10:49 ` Cyrill Gorcunov 2020-03-19 11:12 ` Konstantin Osipov 0 siblings, 1 reply; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 10:49 UTC (permalink / raw) To: Konstantin Osipov, tml On Thu, Mar 19, 2020 at 01:37:49PM +0300, Konstantin Osipov wrote: > * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 12:12]: > > --- a/src/box/wal.c > > +++ b/src/box/wal.c > > @@ -32,6 +32,7 @@ > > > > #include "vclock.h" > > #include "fiber.h" > > +#include "txn.h" > > Please try to avoid including txn.h here. > > You can do it, I'm pretty sure. I use fiber_set_txn and in_txn helpers, they are declared in txn.h. The base idea is that setting fiber->storage.txn to NULL is done inside the wal engine and should be transparent to the caller (which implies that fiber->storage.txn is preserved in case of error and txn_rollback call is allowed). Also the sync write in wal engine uses async write internally, so I fetch the current txn via in_txn and pass it as a completion data. ^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations 2020-03-19 10:49 ` Cyrill Gorcunov @ 2020-03-19 11:12 ` Konstantin Osipov 2020-03-19 11:17 ` Cyrill Gorcunov 2020-03-19 16:20 ` Cyrill Gorcunov 0 siblings, 2 replies; 17+ messages in thread From: Konstantin Osipov @ 2020-03-19 11:12 UTC (permalink / raw) To: Cyrill Gorcunov; +Cc: tml * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 13:51]: I really it's not worth it, adding a cicrular dependency. you can declare struct txn in wal.cc, and pass it around by pointer without using the header. The whole goal of this refactoring was to move fiber_set_txn() outside wal.cc, to txn.cc. Is it still impossible? let's discuss. Module boundaries are really important long term. You add it for a valid reason, and some other patch will begin using txn inside wal for a silly reason. > On Thu, Mar 19, 2020 at 01:37:49PM +0300, Konstantin Osipov wrote: > > * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 12:12]: > > > --- a/src/box/wal.c > > > +++ b/src/box/wal.c > > > @@ -32,6 +32,7 @@ > > > > > > #include "vclock.h" > > > #include "fiber.h" > > > +#include "txn.h" > > > > Please try to avoid including txn.h here. > > > > You can do it, I'm pretty sure. > > I use fiber_set_txn and in_txn helpers, they > are declared in txn.h. > > The base idea is that setting fiber->storage.txn > to NULL is done inside the wal engine and should > be transparent to the caller (which implies that > fiber->storage.txn is preserved in case of error > and txn_rollback call is allowed). > > Also the sync write in wal engine uses async write > internally, so I fetch the current txn via in_txn > and pass it as a completion data. -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations 2020-03-19 11:12 ` Konstantin Osipov @ 2020-03-19 11:17 ` Cyrill Gorcunov 2020-03-19 16:20 ` Cyrill Gorcunov 1 sibling, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 11:17 UTC (permalink / raw) To: Konstantin Osipov, tml On Thu, Mar 19, 2020 at 02:12:06PM +0300, Konstantin Osipov wrote: > * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 13:51]: > > I really it's not worth it, adding a cicrular dependency. > > you can declare struct txn in wal.cc, and pass it around by > pointer without using the header. > > The whole goal of this refactoring was to move fiber_set_txn() > outside wal.cc, to txn.cc. > > Is it still impossible? let's discuss. Kostya, letme try. I think I can move txn related stuff to the journal.c, and wal.c won't need it. Gimme some time... > > Module boundaries are really important long term. You add it for a > valid reason, and some other patch will begin using txn inside wal > for a silly reason. Yes, I know what is bothering you and completely agree. ^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations 2020-03-19 11:12 ` Konstantin Osipov 2020-03-19 11:17 ` Cyrill Gorcunov @ 2020-03-19 16:20 ` Cyrill Gorcunov 1 sibling, 0 replies; 17+ messages in thread From: Cyrill Gorcunov @ 2020-03-19 16:20 UTC (permalink / raw) To: Konstantin Osipov, tml On Thu, Mar 19, 2020 at 02:12:06PM +0300, Konstantin Osipov wrote: > * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 13:51]: > > I really it's not worth it, adding a cicrular dependency. > > you can declare struct txn in wal.cc, and pass it around by > pointer without using the header. > > The whole goal of this refactoring was to move fiber_set_txn() > outside wal.cc, to txn.cc. > > Is it still impossible? let's discuss. > > Module boundaries are really important long term. You add it for a > valid reason, and some other patch will begin using txn inside wal > for a silly reason. Kostya, take a look please, does it look better? I had to drop passing completion data outside but assign it to the entry, this is close what Gosha has except more explicit I think. Same time in wal_init I need to pass write_async_cb function so the journal would save it internally and won't waste memory on every journal entry. The whole series sits in gorcunov/gh-4031-txn_write_to_wal-14 --- From 39334ef60ad8101546bc7a177b07eed10b0c3d65 Mon Sep 17 00:00:00 2001 From: Cyrill Gorcunov <gorcunov@gmail.com> Date: Wed, 18 Mar 2020 22:36:37 +0300 Subject: [PATCH] box/journal: redesign journal operations Currently the journal provides only one method -- write, which implies a callback to trigger upon write completion (in contrary with 1.10 series where all commits were processing in synchronous way). Lets make difference between sync and async writes more notable: provide journal::write_async method which runs completion function once entry is written, in turn journal:write handle transaction in synchronous way. Redesing notes: 1) The callback for async write set once in journal creation. There is no need to carry callback in every journal entry. This allows us to save some memory; 2) txn_commit and txn_commit_async call txn_rollback where appropriate; 3) no need to call journal_entry_complete on sync writes anymore; 4) wal_write_in_wal_mode_none is too long, renamed to wal_write_none; 5) wal engine use async writes internally but it is transparent to callers. Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/box.cc | 8 ++--- src/box/journal.c | 33 +++++++++++++----- src/box/journal.h | 87 +++++++++++++++++++++++++++++++---------------- src/box/txn.c | 61 ++++++++++++++++----------------- src/box/txn.h | 2 +- src/box/vy_log.c | 5 +-- src/box/wal.c | 61 ++++++++++++++++++++++++++------- src/box/wal.h | 4 +-- 8 files changed, 172 insertions(+), 89 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index cf79affca..3a3bda78e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -322,7 +322,6 @@ recovery_journal_write(struct journal *base, { struct recovery_journal *journal = (struct recovery_journal *) base; entry->res = vclock_sum(journal->vclock); - journal_entry_complete(entry); return 0; } @@ -330,8 +329,9 @@ static void recovery_journal_create(struct vclock *v) { static struct recovery_journal journal; - - journal_create(&journal.base, recovery_journal_write, NULL); + journal_create(&journal.base, journal_no_write_async, + journal_no_write_async_cb, + recovery_journal_write, NULL); journal.vclock = v; journal_set(&journal.base); } @@ -2353,7 +2353,7 @@ box_cfg_xc(void) int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size")); enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode")); - if (wal_init(wal_mode, cfg_gets("wal_dir"), + if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"), wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection, on_wal_checkpoint_threshold) != 0) { diag_raise(); diff --git a/src/box/journal.c b/src/box/journal.c index 11e78990d..6c4a0850a 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -32,6 +32,24 @@ #include <small/region.h> #include <diag.h> +int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry) +{ + (void)journal; + + say_error("journal: write_async invalid context"); + entry->res = -1; + return -1; +} + +void +journal_no_write_async_cb(struct journal_entry *entry) +{ + say_error("journal: write_async_cb invalid context"); + entry->res = -1; +} + /** * Used to load from a memtx snapshot. LSN is not used, * but txn_commit() must work. @@ -41,21 +59,20 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry) { (void) journal; entry->res = 0; - journal_entry_complete(entry); return 0; } static struct journal dummy_journal = { - dummy_journal_write, - NULL, + .write_async = journal_no_write_async, + .write_async_cb = journal_no_write_async_cb, + .write = dummy_journal_write, }; struct journal *current_journal = &dummy_journal; struct journal_entry * journal_entry_new(size_t n_rows, struct region *region, - journal_entry_complete_cb on_complete_cb, - void *on_complete_cb_data) + void *complete_data) { struct journal_entry *entry; @@ -68,11 +85,11 @@ journal_entry_new(size_t n_rows, struct region *region, diag_set(OutOfMemory, size, "region", "struct journal_entry"); return NULL; } + + entry->complete_data = complete_data; entry->approx_len = 0; entry->n_rows = n_rows; entry->res = -1; - entry->on_complete_cb = on_complete_cb; - entry->on_complete_cb_data = on_complete_cb_data; + return entry; } - diff --git a/src/box/journal.h b/src/box/journal.h index 64f167c6f..045302906 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -34,6 +34,7 @@ #include <stdbool.h> #include "salad/stailq.h" #include "fiber.h" +#include "txn.h" #if defined(__cplusplus) extern "C" { @@ -42,9 +43,6 @@ extern "C" { struct xrow_header; struct journal_entry; -/** Journal entry finalization callback typedef. */ -typedef void (*journal_entry_complete_cb)(struct journal_entry *entry, void *data); - /** * An entry for an abstract journal. * Simply put, a write ahead log request. @@ -60,17 +58,10 @@ struct journal_entry { * the committed transaction, on error is -1 */ int64_t res; - /** - * A journal entry finalization callback which is going to be called - * after the entry processing was finished in both cases: success - * or fail. Entry->res is set to a result value before the callback - * is fired. - */ - journal_entry_complete_cb on_complete_cb; /** * A journal entry completion callback argument. */ - void *on_complete_cb_data; + void *complete_data; /** * Approximate size of this request when encoded. */ @@ -94,17 +85,7 @@ struct region; */ struct journal_entry * journal_entry_new(size_t n_rows, struct region *region, - journal_entry_complete_cb on_complete_cb, - void *on_complete_cb_data); - -/** - * Finalize a single entry. - */ -static inline void -journal_entry_complete(struct journal_entry *entry) -{ - entry->on_complete_cb(entry, entry->on_complete_cb_data); -} + void *complete_data); /** * An API for an abstract journal for all transactions of this @@ -112,11 +93,31 @@ journal_entry_complete(struct journal_entry *entry) * synchronous replication. */ struct journal { + /** Asynchronous write */ + int (*write_async)(struct journal *journal, + struct journal_entry *entry); + + /** Asynchronous write completion */ + void (*write_async_cb)(struct journal_entry *entry); + + /** Synchronous write */ int (*write)(struct journal *journal, - struct journal_entry *req); + struct journal_entry *entry); + + /** Journal destroy */ void (*destroy)(struct journal *journal); }; +/** + * Finalize a single entry. + */ +static inline void +journal_async_complete(struct journal *journal, struct journal_entry *entry) +{ + assert(journal->write_async_cb != NULL); + journal->write_async_cb(entry); +} + /** * Depending on the step of recovery and instance configuration * points at a concrete implementation of the journal. @@ -124,9 +125,9 @@ struct journal { extern struct journal *current_journal; /** - * Send a single entry to write. + * Write a single entry to the journal in synchronous way. * - * @return 0 if write was scheduled or -1 in case of an error. + * @return 0 if write was processed by a backend or -1 in case of an error. */ static inline int journal_write(struct journal_entry *entry) @@ -134,6 +135,17 @@ journal_write(struct journal_entry *entry) return current_journal->write(current_journal, entry); } +/** + * Queue a single entry to the journal in asynchronous way. + * + * @return 0 if write was queued to a backend or -1 in case of an error. + */ +static inline int +journal_write_async(struct journal_entry *entry) +{ + return current_journal->write_async(current_journal, entry); +} + /** * Change the current implementation of the journaling API. * Happens during life cycle of an instance: @@ -165,13 +177,30 @@ journal_set(struct journal *new_journal) static inline void journal_create(struct journal *journal, - int (*write)(struct journal *, struct journal_entry *), - void (*destroy)(struct journal *)) + int (*write_async)(struct journal *journal, + struct journal_entry *entry), + void (*write_async_cb)(struct journal_entry *entry), + int (*write)(struct journal *journal, + struct journal_entry *entry), + void (*destroy)(struct journal *journal)) { - journal->write = write; - journal->destroy = destroy; + journal->write_async = write_async; + journal->write_async_cb = write_async_cb; + journal->write = write; + journal->destroy = destroy; } +/** + * A stub to issue an error in case if asynchronous + * write is diabled in the backend. + */ +extern int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry); + +extern void +journal_no_write_async_cb(struct journal_entry *entry); + static inline bool journal_is_initialized(struct journal *journal) { diff --git a/src/box/txn.c b/src/box/txn.c index 11c20aceb..b42df3df6 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -463,9 +463,9 @@ txn_complete(struct txn *txn) } void -txn_complete_async(struct journal_entry *entry, void *complete_data) +txn_complete_async(struct journal_entry *entry) { - struct txn *txn = complete_data; + struct txn *txn = entry->complete_data; txn->signature = entry->res; /* * Some commit/rollback triggers require for in_txn fiber @@ -487,7 +487,7 @@ txn_journal_entry_new(struct txn *txn) assert(txn->n_new_rows + txn->n_applier_rows > 0); req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, - &txn->region, txn_complete_async, txn); + &txn->region, txn); if (req == NULL) return NULL; @@ -518,24 +518,6 @@ txn_journal_entry_new(struct txn *txn) return req; } -static int64_t -txn_write_to_wal(struct journal_entry *req) -{ - /* - * Send the entry to the journal. - * - * After this point the transaction must not be used - * so reset the corresponding key in the fiber storage. - */ - fiber_set_txn(fiber(), NULL); - if (journal_write(req) < 0) { - diag_set(ClientError, ER_WAL_IO); - diag_log(); - return -1; - } - return 0; -} - /* * Prepare a transaction using engines. */ @@ -608,7 +590,17 @@ txn_commit_async(struct txn *txn) return -1; } - return txn_write_to_wal(req); + fiber_set_txn(fiber(), NULL); + if (journal_write_async(req) != 0) { + fiber_set_txn(fiber(), txn); + txn_rollback(txn); + + diag_set(ClientError, ER_WAL_IO); + diag_log(); + return -1; + } + + return 0; } int @@ -636,21 +628,28 @@ txn_commit(struct txn *txn) return -1; } - if (txn_write_to_wal(req) != 0) + fiber_set_txn(fiber(), NULL); + if (journal_write(req) != 0) { + fiber_set_txn(fiber(), txn); + txn_rollback(txn); + txn_free(txn); + + diag_set(ClientError, ER_WAL_IO); + diag_log(); return -1; + } - /* - * In case of non-yielding journal the transaction could already - * be done and there is nothing to wait in such cases. - */ if (!txn_has_flag(txn, TXN_IS_DONE)) { - bool cancellable = fiber_set_cancellable(false); - fiber_yield(); - fiber_set_cancellable(cancellable); + txn->signature = req->res; + txn_complete(txn); + fiber_set_txn(fiber(), NULL); } + int res = txn->signature >= 0 ? 0 : -1; - if (res != 0) + if (res != 0) { diag_set(ClientError, ER_WAL_IO); + diag_log(); + } /* Synchronous transactions are freed by the calling fiber. */ txn_free(txn); diff --git a/src/box/txn.h b/src/box/txn.h index 572c76d84..3f6d79d5c 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -292,7 +292,7 @@ txn_rollback(struct txn *txn); * Complete asynchronous transaction. */ void -txn_complete_async(struct journal_entry *entry, void *complete_data); +txn_complete_async(struct journal_entry *entry); /** * Submit a transaction to the journal. diff --git a/src/box/vy_log.c b/src/box/vy_log.c index cb291f3c8..9ead066af 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -815,8 +815,9 @@ vy_log_tx_flush(struct vy_log_tx *tx) tx_size++; size_t used = region_used(&fiber()->gc); - struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, - NULL, NULL); + + struct journal_entry *entry; + entry = journal_entry_new(tx_size, &fiber()->gc, NULL); if (entry == NULL) goto err; diff --git a/src/box/wal.c b/src/box/wal.c index 1668c9348..3b094b0e8 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -60,11 +60,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; int wal_dir_lock = -1; +static int +wal_write_async(struct journal *, struct journal_entry *); + static int wal_write(struct journal *, struct journal_entry *); static int -wal_write_in_wal_mode_none(struct journal *, struct journal_entry *); +wal_write_none_async(struct journal *, struct journal_entry *); + +static int +wal_write_none(struct journal *, struct journal_entry *); /* * WAL writer - maintain a Write Ahead Log for every change @@ -253,9 +259,10 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry) static void tx_schedule_queue(struct stailq *queue) { + struct wal_writer *writer = &wal_writer_singleton; struct journal_entry *req, *tmp; stailq_foreach_entry_safe(req, tmp, queue, fifo) - journal_entry_complete(req); + journal_async_complete(&writer->base, req); } /** @@ -342,6 +349,7 @@ tx_notify_checkpoint(struct cmsg *msg) */ static void wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, + void (*wall_async_cb)(struct journal_entry *entry), const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, wal_on_garbage_collection_f on_garbage_collection, @@ -349,8 +357,14 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, { writer->wal_mode = wal_mode; writer->wal_max_size = wal_max_size; - journal_create(&writer->base, wal_mode == WAL_NONE ? - wal_write_in_wal_mode_none : wal_write, NULL); + + journal_create(&writer->base, + wal_mode == WAL_NONE ? + wal_write_none_async : wal_write_async, + wall_async_cb, + wal_mode == WAL_NONE ? + wal_write_none : wal_write, + NULL); struct xlog_opts opts = xlog_opts_default; opts.sync_is_async = true; @@ -458,14 +472,14 @@ wal_open(struct wal_writer *writer) } int -wal_init(enum wal_mode wal_mode, const char *wal_dirname, - int64_t wal_max_size, const struct tt_uuid *instance_uuid, +wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry), + const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold) { /* Initialize the state. */ struct wal_writer *writer = &wal_writer_singleton; - wal_writer_create(writer, wal_mode, wal_dirname, + wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname, wal_max_size, instance_uuid, on_garbage_collection, on_checkpoint_threshold); @@ -1170,7 +1184,7 @@ wal_writer_f(va_list ap) * to be written to disk. */ static int -wal_write(struct journal *journal, struct journal_entry *entry) +wal_write_async(struct journal *journal, struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; @@ -1221,26 +1235,49 @@ wal_write(struct journal *journal, struct journal_entry *entry) fail: entry->res = -1; - journal_entry_complete(entry); return -1; } static int -wal_write_in_wal_mode_none(struct journal *journal, - struct journal_entry *entry) +wal_write(struct journal *journal, struct journal_entry *entry) +{ + /* + * We can reuse async WAL engine transparently + * to the caller. + */ + if (wal_write_async(journal, entry) != 0) + return -1; + + bool cancellable = fiber_set_cancellable(false); + fiber_yield(); + fiber_set_cancellable(cancellable); + + return 0; +} + +static int +wal_write_none_async(struct journal *journal, + struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; struct vclock vclock_diff; + vclock_create(&vclock_diff); wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows, entry->rows + entry->n_rows); vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); - journal_entry_complete(entry); + return 0; } +static int +wal_write_none(struct journal *journal, struct journal_entry *entry) +{ + return wal_write_none_async(journal, entry); +} + void wal_init_vy_log() { diff --git a/src/box/wal.h b/src/box/wal.h index 76b44941a..11a66a20a 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void); * Start WAL thread and initialize WAL writer. */ int -wal_init(enum wal_mode wal_mode, const char *wal_dirname, - int64_t wal_max_size, const struct tt_uuid *instance_uuid, +wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry), + const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold); -- 2.20.1 ^ permalink raw reply [flat|nested] 17+ messages in thread
end of thread, other threads:[~2020-03-19 16:20 UTC | newest] Thread overview: 17+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2020-03-19 9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 02/11] box: recovery_journal -- declare it as static Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 03/11] box/txn: move fiber_set_txn to header Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 04/11] box/txn: rename txn_write to txn_commit_async Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 05/11] box/txn: move setup of txn start to txn_prepare Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 06/11] box/txn: add txn_commit_nop helper Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 07/11] box/txn: rename txn_entry_complete_cb to txn_complete_async Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 08/11] box/txn: unweave txn_commit from txn_commit_async Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 09/11] box/txn: clear fiber storage right before journal write Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 10/11] box/txn: move journal allocation into separate routine Cyrill Gorcunov 2020-03-19 9:05 ` [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations Cyrill Gorcunov 2020-03-19 10:37 ` Konstantin Osipov 2020-03-19 10:49 ` Cyrill Gorcunov 2020-03-19 11:12 ` Konstantin Osipov 2020-03-19 11:17 ` Cyrill Gorcunov 2020-03-19 16:20 ` Cyrill Gorcunov
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox