Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes
@ 2020-03-19  9:05 Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here Cyrill Gorcunov
                   ` (10 more replies)
  0 siblings, 11 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

Kostya, take a look please, once time permit. I set your Acks where
appropriate, please ping me if you disagree with something.

https://gitlab.com/tarantool/tarantool/pipelines/127699966

branch gorcunov/gh-4031-txn_write_to_wal-13

Cyrill Gorcunov (11):
  box: recovery_journal_create -- set journal here
  box: recovery_journal -- declare it as static
  box/txn: move fiber_set_txn to header
  box/txn: rename txn_write to txn_commit_async
  box/txn: move setup of txn start to txn_prepare
  box/txn: add txn_commit_nop helper
  box/txn: rename txn_entry_complete_cb to txn_complete_async
  box/txn: unweave txn_commit from txn_commit_async
  box/txn: clear fiber storage right before journal write
  box/txn: move journal allocation into separate routine
  box/journal: redesign journal operations

 src/box/applier.cc |   2 +-
 src/box/box.cc     |  21 ++++---
 src/box/journal.c  |  39 +++++++++---
 src/box/journal.h  |  94 ++++++++++++++++++++---------
 src/box/txn.c      | 144 +++++++++++++++++++++++++++++----------------
 src/box/txn.h      |  16 ++++-
 src/box/vy_log.c   |   4 +-
 src/box/wal.c      |  74 ++++++++++++++++++++---
 8 files changed, 282 insertions(+), 112 deletions(-)


base-commit: 1f7e7aa2bf47445dffc713df336288676b927445
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 02/11] box: recovery_journal -- declare it as static Cyrill Gorcunov
                   ` (9 subsequent siblings)
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

Allows to eliminate code duplication.

Acked-by: Konstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/box.cc | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 09dd67ab4..eb5931e37 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -331,6 +331,7 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 {
 	journal_create(&journal->base, recovery_journal_write, NULL);
 	journal->vclock = v;
+	journal_set(&journal->base);
 }
 
 static void
@@ -2055,7 +2056,6 @@ bootstrap_from_master(struct replica *master)
 	engine_begin_final_recovery_xc();
 	struct recovery_journal journal;
 	recovery_journal_create(&journal, &replicaset.vclock);
-	journal_set(&journal.base);
 
 	if (!replication_anon) {
 		applier_resume_to_state(applier, APPLIER_JOINED,
@@ -2221,7 +2221,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct recovery_journal journal;
 	recovery_journal_create(&journal, &recovery->vclock);
-	journal_set(&journal.base);
 
 	/*
 	 * We explicitly request memtx to recover its
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 02/11] box: recovery_journal -- declare it as static
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 03/11] box/txn: move fiber_set_txn to header Cyrill Gorcunov
                   ` (8 subsequent siblings)
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

There is no need for several instances of
recovery journal variable. Lets make it
statically allocated inside recovery_journal_create
routine.

And drop the inline annotation because there is
absolutely no need for this routine being inline.

Suggested-by: Konstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/box.cc | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index eb5931e37..cf79affca 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -326,12 +326,14 @@ recovery_journal_write(struct journal *base,
 	return 0;
 }
 
-static inline void
-recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
+static void
+recovery_journal_create(struct vclock *v)
 {
-	journal_create(&journal->base, recovery_journal_write, NULL);
-	journal->vclock = v;
-	journal_set(&journal->base);
+	static struct recovery_journal journal;
+
+	journal_create(&journal.base, recovery_journal_write, NULL);
+	journal.vclock = v;
+	journal_set(&journal.base);
 }
 
 static void
@@ -2054,8 +2056,7 @@ bootstrap_from_master(struct replica *master)
 	 * Process final data (WALs).
 	 */
 	engine_begin_final_recovery_xc();
-	struct recovery_journal journal;
-	recovery_journal_create(&journal, &replicaset.vclock);
+	recovery_journal_create(&replicaset.vclock);
 
 	if (!replication_anon) {
 		applier_resume_to_state(applier, APPLIER_JOINED,
@@ -2219,8 +2220,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	memtx = (struct memtx_engine *)engine_by_name("memtx");
 	assert(memtx != NULL);
 
-	struct recovery_journal journal;
-	recovery_journal_create(&journal, &recovery->vclock);
+	recovery_journal_create(&recovery->vclock);
 
 	/*
 	 * We explicitly request memtx to recover its
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 03/11] box/txn: move fiber_set_txn to header
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 02/11] box: recovery_journal -- declare it as static Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 04/11] box/txn: rename txn_write to txn_commit_async Cyrill Gorcunov
                   ` (7 subsequent siblings)
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

We will use it inside wal engine.

Moreover we already have a "get" function in this header
which is named in_txt(). Having both get/set in one place
should be more consistent.

Acked-by: Konstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn.c | 6 ------
 src/box/txn.h | 7 +++++++
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index a4ca48224..6799f6c4b 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -49,12 +49,6 @@ txn_on_yield(struct trigger *trigger, void *event);
 static void
 txn_run_rollback_triggers(struct txn *txn, struct rlist *triggers);
 
-static inline void
-fiber_set_txn(struct fiber *fiber, struct txn *txn)
-{
-	fiber->storage.txn = txn;
-}
-
 static int
 txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
 {
diff --git a/src/box/txn.h b/src/box/txn.h
index ae2c3a58f..7a7e52954 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -256,6 +256,13 @@ in_txn(void)
 	return fiber()->storage.txn;
 }
 
+/* Set to the current transaction (if any) */
+static inline void
+fiber_set_txn(struct fiber *fiber, struct txn *txn)
+{
+	fiber->storage.txn = txn;
+}
+
 /**
  * Start a transaction explicitly.
  * @pre no transaction is active
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 04/11] box/txn: rename txn_write to txn_commit_async
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
                   ` (2 preceding siblings ...)
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 03/11] box/txn: move fiber_set_txn to header Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 05/11] box/txn: move setup of txn start to txn_prepare Cyrill Gorcunov
                   ` (6 subsequent siblings)
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

To reflect the fact tha we're don't waiting for
transaction to complete but rely on completion
callback.

Acked-by: Konstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc | 2 +-
 src/box/txn.c      | 4 ++--
 src/box/txn.h      | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index ff40e03c6..8666a3a98 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -805,7 +805,7 @@ applier_apply_tx(struct stailq *rows)
 	trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL);
 	txn_on_commit(txn, on_commit);
 
-	if (txn_write(txn) < 0)
+	if (txn_commit_async(txn) < 0)
 		goto fail;
 
 	/* Transaction was sent to journal so promote vclock. */
diff --git a/src/box/txn.c b/src/box/txn.c
index 6799f6c4b..9f61303ab 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -558,7 +558,7 @@ txn_prepare(struct txn *txn)
 }
 
 int
-txn_write(struct txn *txn)
+txn_commit_async(struct txn *txn)
 {
 	if (txn_prepare(txn) != 0) {
 		txn_rollback(txn);
@@ -586,7 +586,7 @@ txn_commit(struct txn *txn)
 {
 	txn->fiber = fiber();
 
-	if (txn_write(txn) != 0)
+	if (txn_commit_async(txn) != 0)
 		return -1;
 	/*
 	 * In case of non-yielding journal the transaction could already
diff --git a/src/box/txn.h b/src/box/txn.h
index 7a7e52954..b950e2e18 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -300,7 +300,7 @@ txn_rollback(struct txn *txn);
  * freed.
  */
 int
-txn_write(struct txn *txn);
+txn_commit_async(struct txn *txn);
 
 /**
  * Most txns don't have triggers, and txn objects
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 05/11] box/txn: move setup of txn start to txn_prepare
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
                   ` (3 preceding siblings ...)
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 04/11] box/txn: rename txn_write to txn_commit_async Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 06/11] box/txn: add txn_commit_nop helper Cyrill Gorcunov
                   ` (5 subsequent siblings)
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

For unification sake, we will handle nop transactions
via common helper for both sync and async cases.

Acked-by: Konstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 9f61303ab..6bb1b06ed 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -554,6 +554,8 @@ txn_prepare(struct txn *txn)
 	trigger_clear(&txn->fiber_on_stop);
 	if (!txn_has_flag(txn, TXN_CAN_YIELD))
 		trigger_clear(&txn->fiber_on_yield);
+
+	txn->start_tm = ev_monotonic_now(loop());
 	return 0;
 }
 
@@ -569,7 +571,6 @@ txn_commit_async(struct txn *txn)
 	 * After this point the transaction must not be used
 	 * so reset the corresponding key in the fiber storage.
 	 */
-	txn->start_tm = ev_monotonic_now(loop());
 	if (txn->n_new_rows + txn->n_applier_rows == 0) {
 		/* Nothing to do. */
 		txn->signature = 0;
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 06/11] box/txn: add txn_commit_nop helper
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
                   ` (4 preceding siblings ...)
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 05/11] box/txn: move setup of txn start to txn_prepare Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 07/11] box/txn: rename txn_entry_complete_cb to txn_complete_async Cyrill Gorcunov
                   ` (4 subsequent siblings)
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

To reuse in sync trancastion once journal
redesign is complete.

Acked-by: Konstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn.c | 26 +++++++++++++++++++-------
 1 file changed, 19 insertions(+), 7 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 6bb1b06ed..7a2a4877e 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -559,6 +559,22 @@ txn_prepare(struct txn *txn)
 	return 0;
 }
 
+/**
+ * Complete transaction early if it is barely nop.
+ */
+static bool
+txn_commit_nop(struct txn *txn)
+{
+	if (txn->n_new_rows + txn->n_applier_rows == 0) {
+		txn->signature = 0;
+		txn_complete(txn);
+		fiber_set_txn(fiber(), NULL);
+		return true;
+	}
+
+	return false;
+}
+
 int
 txn_commit_async(struct txn *txn)
 {
@@ -567,17 +583,13 @@ txn_commit_async(struct txn *txn)
 		return -1;
 	}
 
+	if (txn_commit_nop(txn))
+		return 0;
+
 	/*
 	 * After this point the transaction must not be used
 	 * so reset the corresponding key in the fiber storage.
 	 */
-	if (txn->n_new_rows + txn->n_applier_rows == 0) {
-		/* Nothing to do. */
-		txn->signature = 0;
-		txn_complete(txn);
-		fiber_set_txn(fiber(), NULL);
-		return 0;
-	}
 	fiber_set_txn(fiber(), NULL);
 	return txn_write_to_wal(txn);
 }
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 07/11] box/txn: rename txn_entry_complete_cb to txn_complete_async
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
                   ` (5 preceding siblings ...)
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 06/11] box/txn: add txn_commit_nop helper Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 08/11] box/txn: unweave txn_commit from txn_commit_async Cyrill Gorcunov
                   ` (3 subsequent siblings)
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

We will use this function inside wal engine right
after journal redesign is complete.

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn.c | 8 ++++----
 src/box/txn.h | 7 +++++++
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 7a2a4877e..60da5b564 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -462,10 +462,10 @@ txn_complete(struct txn *txn)
 	}
 }
 
-static void
-txn_entry_complete_cb(struct journal_entry *entry, void *data)
+void
+txn_complete_async(struct journal_entry *entry, void *complete_data)
 {
-	struct txn *txn = data;
+	struct txn *txn = complete_data;
 	txn->signature = entry->res;
 	/*
 	 * Some commit/rollback triggers require for in_txn fiber
@@ -487,7 +487,7 @@ txn_write_to_wal(struct txn *txn)
 	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
 						      txn->n_applier_rows,
 						      &txn->region,
-						      txn_entry_complete_cb,
+						      txn_complete_async,
 						      txn);
 	if (req == NULL) {
 		txn_rollback(txn);
diff --git a/src/box/txn.h b/src/box/txn.h
index b950e2e18..572c76d84 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -44,6 +44,7 @@ extern "C" {
 /** box statistics */
 extern struct rmean *rmean_box;
 
+struct journal_entry;
 struct engine;
 struct space;
 struct tuple;
@@ -287,6 +288,12 @@ txn_commit(struct txn *txn);
 void
 txn_rollback(struct txn *txn);
 
+/**
+ * Complete asynchronous transaction.
+ */
+void
+txn_complete_async(struct journal_entry *entry, void *complete_data);
+
 /**
  * Submit a transaction to the journal.
  * @pre txn == in_txn()
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 08/11] box/txn: unweave txn_commit from txn_commit_async
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
                   ` (6 preceding siblings ...)
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 07/11] box/txn: rename txn_entry_complete_cb to txn_complete_async Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 09/11] box/txn: clear fiber storage right before journal write Cyrill Gorcunov
                   ` (2 subsequent siblings)
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

We gonna diverge sync and async code flow thus lets
make txn_commit to not use txn_commit_async.

Fixes #4031

Acked-by: Konstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn.c | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 60da5b564..06043a2d8 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -599,8 +599,21 @@ txn_commit(struct txn *txn)
 {
 	txn->fiber = fiber();
 
-	if (txn_commit_async(txn) != 0)
+	if (txn_prepare(txn) != 0) {
+		txn_rollback(txn);
+		txn_free(txn);
+		return -1;
+	}
+
+	if (txn_commit_nop(txn)) {
+		txn_free(txn);
+		return 0;
+	}
+
+	fiber_set_txn(fiber(), NULL);
+	if (txn_write_to_wal(txn) != 0)
 		return -1;
+
 	/*
 	 * In case of non-yielding journal the transaction could already
 	 * be done and there is nothing to wait in such cases.
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 09/11] box/txn: clear fiber storage right before journal write
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
                   ` (7 preceding siblings ...)
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 08/11] box/txn: unweave txn_commit from txn_commit_async Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 10/11] box/txn: move journal allocation into separate routine Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations Cyrill Gorcunov
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

Otherwise we won't be able to make a rollback in case
of journal_entry_new allocation failure.

Acked-by: Konstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn.c | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 06043a2d8..4e23e9828 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -513,7 +513,13 @@ txn_write_to_wal(struct txn *txn)
 	assert(remote_row == req->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
 
-	/* Send the entry to the journal. */
+	/*
+	 * Send the entry to the journal.
+	 *
+	 * After this point the transaction must not be used
+	 * so reset the corresponding key in the fiber storage.
+	 */
+	fiber_set_txn(fiber(), NULL);
 	if (journal_write(req) < 0) {
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
@@ -586,11 +592,6 @@ txn_commit_async(struct txn *txn)
 	if (txn_commit_nop(txn))
 		return 0;
 
-	/*
-	 * After this point the transaction must not be used
-	 * so reset the corresponding key in the fiber storage.
-	 */
-	fiber_set_txn(fiber(), NULL);
 	return txn_write_to_wal(txn);
 }
 
@@ -610,7 +611,6 @@ txn_commit(struct txn *txn)
 		return 0;
 	}
 
-	fiber_set_txn(fiber(), NULL);
 	if (txn_write_to_wal(txn) != 0)
 		return -1;
 
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 10/11] box/txn: move journal allocation into separate routine
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
                   ` (8 preceding siblings ...)
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 09/11] box/txn: clear fiber storage right before journal write Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations Cyrill Gorcunov
  10 siblings, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

This makes code easier to read and allows to reuse
txn allocation in sync\async writes.

Acked-by: Konstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn.c | 57 ++++++++++++++++++++++++++++++++++++---------------
 1 file changed, 41 insertions(+), 16 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 4e23e9828..11c20aceb 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -478,41 +478,49 @@ txn_complete_async(struct journal_entry *entry, void *complete_data)
 	fiber_set_txn(fiber(), NULL);
 }
 
-static int64_t
-txn_write_to_wal(struct txn *txn)
+static struct journal_entry *
+txn_journal_entry_new(struct txn *txn)
 {
+	struct journal_entry *req;
+	struct txn_stmt *stmt;
+
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
-	/* Prepare a journal entry. */
-	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
-						      txn->n_applier_rows,
-						      &txn->region,
-						      txn_complete_async,
-						      txn);
-	if (req == NULL) {
-		txn_rollback(txn);
-		return -1;
-	}
+	req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows,
+				&txn->region, txn_complete_async, txn);
+	if (req == NULL)
+		return NULL;
 
-	struct txn_stmt *stmt;
 	struct xrow_header **remote_row = req->rows;
 	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
+
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->has_triggers) {
 			txn_init_triggers(txn);
 			rlist_splice(&txn->on_commit, &stmt->on_commit);
 		}
+
+		/* A read (e.g. select) request */
 		if (stmt->row == NULL)
-			continue; /* A read (e.g. select) request */
+			continue;
+
 		if (stmt->row->replica_id == 0)
 			*local_row++ = stmt->row;
 		else
 			*remote_row++ = stmt->row;
+
 		req->approx_len += xrow_approx_len(stmt->row);
 	}
+
 	assert(remote_row == req->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
 
+	return req;
+}
+
+static int64_t
+txn_write_to_wal(struct journal_entry *req)
+{
 	/*
 	 * Send the entry to the journal.
 	 *
@@ -584,6 +592,8 @@ txn_commit_nop(struct txn *txn)
 int
 txn_commit_async(struct txn *txn)
 {
+	struct journal_entry *req;
+
 	if (txn_prepare(txn) != 0) {
 		txn_rollback(txn);
 		return -1;
@@ -592,12 +602,20 @@ txn_commit_async(struct txn *txn)
 	if (txn_commit_nop(txn))
 		return 0;
 
-	return txn_write_to_wal(txn);
+	req = txn_journal_entry_new(txn);
+	if (req == NULL) {
+		txn_rollback(txn);
+		return -1;
+	}
+
+	return txn_write_to_wal(req);
 }
 
 int
 txn_commit(struct txn *txn)
 {
+	struct journal_entry *req;
+
 	txn->fiber = fiber();
 
 	if (txn_prepare(txn) != 0) {
@@ -611,7 +629,14 @@ txn_commit(struct txn *txn)
 		return 0;
 	}
 
-	if (txn_write_to_wal(txn) != 0)
+	req = txn_journal_entry_new(txn);
+	if (req == NULL) {
+		txn_rollback(txn);
+		txn_free(txn);
+		return -1;
+	}
+
+	if (txn_write_to_wal(req) != 0)
 		return -1;
 
 	/*
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations
  2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
                   ` (9 preceding siblings ...)
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 10/11] box/txn: move journal allocation into separate routine Cyrill Gorcunov
@ 2020-03-19  9:05 ` Cyrill Gorcunov
  2020-03-19 10:37   ` Konstantin Osipov
  10 siblings, 1 reply; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19  9:05 UTC (permalink / raw)
  To: tml

Currently the journal provides only one method -- write,
which implies a callback to trigger upon write completion
(in contrary with 1.10 series where all commits were
processing in synchronous way).

Lets make difference between sync and async writes more
notable: provide journal::write_async method which takes
a callback data as an argument, while journal:write handle
transaction in synchronous way.

Redesing notes:

1) The callback for async write set once in journal
   creation. There is no need to carry callback in
   every journal entry. This allows us to save some
   memory

2) Both journal_write and journal_write_async require
   the caller to pass valid fiber->storage.txn, on
   error the txn must not be reset but preserved so
   callers would be able to run txn_rollback

3) txn_commit and txn_commit_async call txn_rollback
   where appropriate

4) no need to call journal_entry_complete on sync
   writes anymore

5) wal_write_in_wal_mode_none is too long, renamed
   to wal_write_none

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/box.cc    |  6 +--
 src/box/journal.c | 39 +++++++++++++++-----
 src/box/journal.h | 94 +++++++++++++++++++++++++++++++++--------------
 src/box/txn.c     | 53 ++++++++++++--------------
 src/box/vy_log.c  |  4 +-
 src/box/wal.c     | 74 ++++++++++++++++++++++++++++++++-----
 6 files changed, 191 insertions(+), 79 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index cf79affca..87ddbeb3a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -322,7 +322,6 @@ recovery_journal_write(struct journal *base,
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
 	entry->res = vclock_sum(journal->vclock);
-	journal_entry_complete(entry);
 	return 0;
 }
 
@@ -330,8 +329,9 @@ static void
 recovery_journal_create(struct vclock *v)
 {
 	static struct recovery_journal journal;
-
-	journal_create(&journal.base, recovery_journal_write, NULL);
+	journal_create(&journal.base, journal_no_write_async,
+		       journal_no_write_async_cb,
+		       recovery_journal_write, NULL);
 	journal.vclock = v;
 	journal_set(&journal.base);
 }
diff --git a/src/box/journal.c b/src/box/journal.c
index 11e78990d..036aad87a 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -32,6 +32,29 @@
 #include <small/region.h>
 #include <diag.h>
 
+int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry,
+		       void *complete_data)
+{
+	(void)complete_data;
+	(void)journal;
+
+	say_error("journal: write_async invalid context");
+	entry->res = -1;
+	return -1;
+}
+
+void
+journal_no_write_async_cb(struct journal_entry *entry,
+			  void *complete_data)
+{
+	(void)complete_data;
+
+	say_error("journal: write_async_cb invalid context");
+	entry->res = -1;
+}
+
 /**
  * Used to load from a memtx snapshot. LSN is not used,
  * but txn_commit() must work.
@@ -41,21 +64,19 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
 	entry->res = 0;
-	journal_entry_complete(entry);
 	return 0;
 }
 
 static struct journal dummy_journal = {
-	dummy_journal_write,
-	NULL,
+	.write_async	= journal_no_write_async,
+	.write_async_cb	= journal_no_write_async_cb,
+	.write		= dummy_journal_write,
 };
 
 struct journal *current_journal = &dummy_journal;
 
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region,
-		  journal_entry_complete_cb on_complete_cb,
-		  void *on_complete_cb_data)
+journal_entry_new(size_t n_rows, struct region *region)
 {
 	struct journal_entry *entry;
 
@@ -68,11 +89,11 @@ journal_entry_new(size_t n_rows, struct region *region,
 		diag_set(OutOfMemory, size, "region", "struct journal_entry");
 		return NULL;
 	}
+
+	entry->complete_data = NULL;
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
-	entry->on_complete_cb = on_complete_cb;
-	entry->on_complete_cb_data = on_complete_cb_data;
+
 	return entry;
 }
-
diff --git a/src/box/journal.h b/src/box/journal.h
index 64f167c6f..e1947edd1 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -34,6 +34,7 @@
 #include <stdbool.h>
 #include "salad/stailq.h"
 #include "fiber.h"
+#include "txn.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -60,17 +61,10 @@ struct journal_entry {
 	 * the committed transaction, on error is -1
 	 */
 	int64_t res;
-	/**
-	 * A journal entry finalization callback which is going to be called
-	 * after the entry processing was finished in both cases: success
-	 * or fail. Entry->res is set to a result value before the callback
-	 * is fired.
-	 */
-	journal_entry_complete_cb on_complete_cb;
 	/**
 	 * A journal entry completion callback argument.
 	 */
-	void *on_complete_cb_data;
+	void *complete_data;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -93,18 +87,7 @@ struct region;
  * @return NULL if out of memory, fiber diagnostics area is set
  */
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region,
-		  journal_entry_complete_cb on_complete_cb,
-		  void *on_complete_cb_data);
-
-/**
- * Finalize a single entry.
- */
-static inline void
-journal_entry_complete(struct journal_entry *entry)
-{
-	entry->on_complete_cb(entry, entry->on_complete_cb_data);
-}
+journal_entry_new(size_t n_rows, struct region *region);
 
 /**
  * An API for an abstract journal for all transactions of this
@@ -112,11 +95,33 @@ journal_entry_complete(struct journal_entry *entry)
  * synchronous replication.
  */
 struct journal {
+	/** Asynchronous write */
+	int (*write_async)(struct journal *journal,
+			   struct journal_entry *entry,
+			   void *complete_data);
+
+	/** Asynchronous write completion */
+	void (*write_async_cb)(struct journal_entry *entry,
+			       void *complete_data);
+
+	/** Synchronous write */
 	int (*write)(struct journal *journal,
-		     struct journal_entry *req);
+		     struct journal_entry *entry);
+
+	/** Journal destroy */
 	void (*destroy)(struct journal *journal);
 };
 
+/**
+ * Finalize a single entry.
+ */
+static inline void
+journal_async_complete(struct journal *journal, struct journal_entry *entry)
+{
+	assert(journal->write_async_cb != NULL);
+	journal->write_async_cb(entry, entry->complete_data);
+}
+
 /**
  * Depending on the step of recovery and instance configuration
  * points at a concrete implementation of the journal.
@@ -124,16 +129,30 @@ struct journal {
 extern struct journal *current_journal;
 
 /**
- * Send a single entry to write.
+ * Write a single entry to the journal in synchronous way.
  *
- * @return 0 if write was scheduled or -1 in case of an error.
+ * @return 0 if write was processed by a backend or -1 in case of an error.
  */
 static inline int
 journal_write(struct journal_entry *entry)
 {
+	assert(in_txn() != NULL);
 	return current_journal->write(current_journal, entry);
 }
 
+/**
+ * Queue a single entry to the journal in asynchronous way.
+ *
+ * @return 0 if write was queued to a backend or -1 in case of an error.
+ */
+static inline int
+journal_write_async(struct journal_entry *entry, void *complete_data)
+{
+	assert(in_txn() != NULL);
+	return current_journal->write_async(current_journal, entry,
+					    complete_data);
+}
+
 /**
  * Change the current implementation of the journaling API.
  * Happens during life cycle of an instance:
@@ -165,13 +184,34 @@ journal_set(struct journal *new_journal)
 
 static inline void
 journal_create(struct journal *journal,
-	       int (*write)(struct journal *, struct journal_entry *),
-	       void (*destroy)(struct journal *))
+	       int (*write_async)(struct journal *journal,
+				  struct journal_entry *entry,
+				  void *complete_data),
+	       void (*write_async_cb)(struct journal_entry *entry,
+				      void *complete_data),
+	       int (*write)(struct journal *journal,
+			    struct journal_entry *entry),
+	       void (*destroy)(struct journal *journal))
 {
-	journal->write = write;
-	journal->destroy = destroy;
+	journal->write_async	= write_async;
+	journal->write_async_cb	= write_async_cb;
+	journal->write		= write;
+	journal->destroy	= destroy;
 }
 
+/**
+ * A stub to issue an error in case if asynchronous
+ * write is diabled in the backend.
+ */
+extern int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry,
+		       void *complete_data);
+
+extern void
+journal_no_write_async_cb(struct journal_entry *entry,
+			  void *complete_data);
+
 static inline bool
 journal_is_initialized(struct journal *journal)
 {
diff --git a/src/box/txn.c b/src/box/txn.c
index 11c20aceb..33302586d 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -487,7 +487,7 @@ txn_journal_entry_new(struct txn *txn)
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
 	req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows,
-				&txn->region, txn_complete_async, txn);
+				&txn->region);
 	if (req == NULL)
 		return NULL;
 
@@ -518,24 +518,6 @@ txn_journal_entry_new(struct txn *txn)
 	return req;
 }
 
-static int64_t
-txn_write_to_wal(struct journal_entry *req)
-{
-	/*
-	 * Send the entry to the journal.
-	 *
-	 * After this point the transaction must not be used
-	 * so reset the corresponding key in the fiber storage.
-	 */
-	fiber_set_txn(fiber(), NULL);
-	if (journal_write(req) < 0) {
-		diag_set(ClientError, ER_WAL_IO);
-		diag_log();
-		return -1;
-	}
-	return 0;
-}
-
 /*
  * Prepare a transaction using engines.
  */
@@ -608,7 +590,15 @@ txn_commit_async(struct txn *txn)
 		return -1;
 	}
 
-	return txn_write_to_wal(req);
+	if (journal_write_async(req, txn) != 0) {
+		txn_rollback(txn);
+
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		return -1;
+	}
+
+	return 0;
 }
 
 int
@@ -636,21 +626,26 @@ txn_commit(struct txn *txn)
 		return -1;
 	}
 
-	if (txn_write_to_wal(req) != 0)
+	if (journal_write(req) != 0) {
+		txn_rollback(txn);
+		txn_free(txn);
+
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
 		return -1;
+	}
 
-	/*
-	 * In case of non-yielding journal the transaction could already
-	 * be done and there is nothing to wait in such cases.
-	 */
 	if (!txn_has_flag(txn, TXN_IS_DONE)) {
-		bool cancellable = fiber_set_cancellable(false);
-		fiber_yield();
-		fiber_set_cancellable(cancellable);
+		txn->signature = req->res;
+		txn_complete(txn);
+		fiber_set_txn(fiber(), NULL);
 	}
+
 	int res = txn->signature >= 0 ? 0 : -1;
-	if (res != 0)
+	if (res != 0) {
 		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+	}
 
 	/* Synchronous transactions are freed by the calling fiber. */
 	txn_free(txn);
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index cb291f3c8..92171ec21 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -815,8 +815,8 @@ vy_log_tx_flush(struct vy_log_tx *tx)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc,
-							NULL, NULL);
+
+	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 1668c9348..ba9f22e7a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -32,6 +32,7 @@
 
 #include "vclock.h"
 #include "fiber.h"
+#include "txn.h"
 #include "fio.h"
 #include "errinj.h"
 #include "error.h"
@@ -60,11 +61,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
 int wal_dir_lock = -1;
 
+static int
+wal_write_async(struct journal *, struct journal_entry *, void *);
+
 static int
 wal_write(struct journal *, struct journal_entry *);
 
 static int
-wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
+wal_write_none_async(struct journal *, struct journal_entry *, void *);
+
+static int
+wal_write_none(struct journal *, struct journal_entry *);
 
 /*
  * WAL writer - maintain a Write Ahead Log for every change
@@ -253,9 +260,10 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 static void
 tx_schedule_queue(struct stailq *queue)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	struct journal_entry *req, *tmp;
 	stailq_foreach_entry_safe(req, tmp, queue, fifo)
-		journal_entry_complete(req);
+		journal_async_complete(&writer->base, req);
 }
 
 /**
@@ -349,8 +357,14 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_size = wal_max_size;
-	journal_create(&writer->base, wal_mode == WAL_NONE ?
-		       wal_write_in_wal_mode_none : wal_write, NULL);
+
+	journal_create(&writer->base,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none_async : wal_write_async,
+		       txn_complete_async,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none : wal_write,
+		       NULL);
 
 	struct xlog_opts opts = xlog_opts_default;
 	opts.sync_is_async = true;
@@ -1170,9 +1184,18 @@ wal_writer_f(va_list ap)
  * to be written to disk.
  */
 static int
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_write_async(struct journal *journal, struct journal_entry *entry,
+		void *complete_data)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
+	entry->complete_data = complete_data;
+	struct txn *txn = complete_data;
+
+	/*
+	 * After this point the transaction should not
+	 * be bound to the fiber, it handled by a callback.
+	 */
+	fiber_set_txn(fiber(), NULL);
 
 	ERROR_INJECT(ERRINJ_WAL_IO, {
 		goto fail;
@@ -1220,14 +1243,39 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	return 0;
 
 fail:
+	/*
+	 * Don't forget to restore transaction
+	 * in a fiber storage: the caller should
+	 * be able to run a rollback procedure.
+	 */
+	fiber_set_txn(fiber(), txn);
 	entry->res = -1;
-	journal_entry_complete(entry);
 	return -1;
 }
 
 static int
-wal_write_in_wal_mode_none(struct journal *journal,
-			   struct journal_entry *entry)
+wal_write(struct journal *journal, struct journal_entry *entry)
+{
+	struct txn *txn = in_txn();
+
+	/*
+	 * We can reuse async WAL engine transparently
+	 * to the caller.
+	 */
+	if (wal_write_async(journal, entry, txn) != 0)
+		return -1;
+
+	bool cancellable = fiber_set_cancellable(false);
+	fiber_yield();
+	fiber_set_cancellable(cancellable);
+
+	return 0;
+}
+
+static int
+wal_write_none_async(struct journal *journal,
+		     struct journal_entry *entry,
+		     void *complete_data)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	struct vclock vclock_diff;
@@ -1237,10 +1285,18 @@ wal_write_in_wal_mode_none(struct journal *journal,
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
-	journal_entry_complete(entry);
+
+	(void)complete_data;
+	fiber_set_txn(fiber(), NULL);
 	return 0;
 }
 
+static int
+wal_write_none(struct journal *journal, struct journal_entry *entry)
+{
+	return wal_write_none_async(journal, entry, NULL);
+}
+
 void
 wal_init_vy_log()
 {
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations
  2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations Cyrill Gorcunov
@ 2020-03-19 10:37   ` Konstantin Osipov
  2020-03-19 10:49     ` Cyrill Gorcunov
  0 siblings, 1 reply; 17+ messages in thread
From: Konstantin Osipov @ 2020-03-19 10:37 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tml

* Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 12:12]:
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -32,6 +32,7 @@
>  
>  #include "vclock.h"
>  #include "fiber.h"
> +#include "txn.h"

Please try to avoid including txn.h here.

You can do it, I'm pretty sure.

>  #include "fio.h"
>  #include "errinj.h"
>  #include "error.h"
> @@ -60,11 +61,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
>  

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations
  2020-03-19 10:37   ` Konstantin Osipov
@ 2020-03-19 10:49     ` Cyrill Gorcunov
  2020-03-19 11:12       ` Konstantin Osipov
  0 siblings, 1 reply; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19 10:49 UTC (permalink / raw)
  To: Konstantin Osipov, tml

On Thu, Mar 19, 2020 at 01:37:49PM +0300, Konstantin Osipov wrote:
> * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 12:12]:
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -32,6 +32,7 @@
> >  
> >  #include "vclock.h"
> >  #include "fiber.h"
> > +#include "txn.h"
> 
> Please try to avoid including txn.h here.
> 
> You can do it, I'm pretty sure.

I use fiber_set_txn and in_txn helpers, they
are declared in txn.h.

The base idea is that setting fiber->storage.txn
to NULL is done inside the wal engine and should
be transparent to the caller (which implies that
fiber->storage.txn is preserved in case of error
and txn_rollback call is allowed).

Also the sync write in wal engine uses async write
internally, so I fetch the current txn via in_txn
and pass it as a completion data.

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations
  2020-03-19 10:49     ` Cyrill Gorcunov
@ 2020-03-19 11:12       ` Konstantin Osipov
  2020-03-19 11:17         ` Cyrill Gorcunov
  2020-03-19 16:20         ` Cyrill Gorcunov
  0 siblings, 2 replies; 17+ messages in thread
From: Konstantin Osipov @ 2020-03-19 11:12 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tml

* Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 13:51]:

I really it's not worth it, adding a cicrular dependency.

you can declare struct txn in wal.cc, and pass it around by
pointer without using the header.

The whole goal of this refactoring was to move fiber_set_txn() 
outside wal.cc, to txn.cc.

Is it still impossible? let's discuss.

Module boundaries are really important long term. You add it for a
valid reason, and some other patch will begin using txn inside wal
for a silly reason.

> On Thu, Mar 19, 2020 at 01:37:49PM +0300, Konstantin Osipov wrote:
> > * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 12:12]:
> > > --- a/src/box/wal.c
> > > +++ b/src/box/wal.c
> > > @@ -32,6 +32,7 @@
> > >  
> > >  #include "vclock.h"
> > >  #include "fiber.h"
> > > +#include "txn.h"
> > 
> > Please try to avoid including txn.h here.
> > 
> > You can do it, I'm pretty sure.
> 
> I use fiber_set_txn and in_txn helpers, they
> are declared in txn.h.
> 
> The base idea is that setting fiber->storage.txn
> to NULL is done inside the wal engine and should
> be transparent to the caller (which implies that
> fiber->storage.txn is preserved in case of error
> and txn_rollback call is allowed).
> 
> Also the sync write in wal engine uses async write
> internally, so I fetch the current txn via in_txn
> and pass it as a completion data.

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations
  2020-03-19 11:12       ` Konstantin Osipov
@ 2020-03-19 11:17         ` Cyrill Gorcunov
  2020-03-19 16:20         ` Cyrill Gorcunov
  1 sibling, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19 11:17 UTC (permalink / raw)
  To: Konstantin Osipov, tml

On Thu, Mar 19, 2020 at 02:12:06PM +0300, Konstantin Osipov wrote:
> * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 13:51]:
> 
> I really it's not worth it, adding a cicrular dependency.
> 
> you can declare struct txn in wal.cc, and pass it around by
> pointer without using the header.
> 
> The whole goal of this refactoring was to move fiber_set_txn() 
> outside wal.cc, to txn.cc.
> 
> Is it still impossible? let's discuss.

Kostya, letme try. I think I can move txn related stuff to
the journal.c, and wal.c won't need it. Gimme some time...

> 
> Module boundaries are really important long term. You add it for a
> valid reason, and some other patch will begin using txn inside wal
> for a silly reason.

Yes, I know what is bothering you and completely agree.

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations
  2020-03-19 11:12       ` Konstantin Osipov
  2020-03-19 11:17         ` Cyrill Gorcunov
@ 2020-03-19 16:20         ` Cyrill Gorcunov
  1 sibling, 0 replies; 17+ messages in thread
From: Cyrill Gorcunov @ 2020-03-19 16:20 UTC (permalink / raw)
  To: Konstantin Osipov, tml

On Thu, Mar 19, 2020 at 02:12:06PM +0300, Konstantin Osipov wrote:
> * Cyrill Gorcunov <gorcunov@gmail.com> [20/03/19 13:51]:
> 
> I really it's not worth it, adding a cicrular dependency.
> 
> you can declare struct txn in wal.cc, and pass it around by
> pointer without using the header.
> 
> The whole goal of this refactoring was to move fiber_set_txn() 
> outside wal.cc, to txn.cc.
> 
> Is it still impossible? let's discuss.
> 
> Module boundaries are really important long term. You add it for a
> valid reason, and some other patch will begin using txn inside wal
> for a silly reason.

Kostya, take a look please, does it look better? I had to drop passing
completion data outside but assign it to the entry, this is close what
Gosha has except more explicit I think. Same time in wal_init I need
to pass write_async_cb function so the journal would save it internally
and won't waste memory on every journal entry.

The whole series sits in gorcunov/gh-4031-txn_write_to_wal-14

---
From 39334ef60ad8101546bc7a177b07eed10b0c3d65 Mon Sep 17 00:00:00 2001
From: Cyrill Gorcunov <gorcunov@gmail.com>
Date: Wed, 18 Mar 2020 22:36:37 +0300
Subject: [PATCH] box/journal: redesign journal operations

Currently the journal provides only one method -- write,
which implies a callback to trigger upon write completion
(in contrary with 1.10 series where all commits were
processing in synchronous way).

Lets make difference between sync and async writes more
notable: provide journal::write_async method which runs
completion function once entry is written, in turn
journal:write handle transaction in synchronous way.

Redesing notes:

1) The callback for async write set once in journal
   creation. There is no need to carry callback in
   every journal entry. This allows us to save some
   memory;

2) txn_commit and txn_commit_async call txn_rollback
   where appropriate;

3) no need to call journal_entry_complete on sync
   writes anymore;

4) wal_write_in_wal_mode_none is too long, renamed
   to wal_write_none;

5) wal engine use async writes internally but it is
   transparent to callers.

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/box.cc    |  8 ++---
 src/box/journal.c | 33 +++++++++++++-----
 src/box/journal.h | 87 +++++++++++++++++++++++++++++++----------------
 src/box/txn.c     | 61 ++++++++++++++++-----------------
 src/box/txn.h     |  2 +-
 src/box/vy_log.c  |  5 +--
 src/box/wal.c     | 61 ++++++++++++++++++++++++++-------
 src/box/wal.h     |  4 +--
 8 files changed, 172 insertions(+), 89 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index cf79affca..3a3bda78e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -322,7 +322,6 @@ recovery_journal_write(struct journal *base,
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
 	entry->res = vclock_sum(journal->vclock);
-	journal_entry_complete(entry);
 	return 0;
 }
 
@@ -330,8 +329,9 @@ static void
 recovery_journal_create(struct vclock *v)
 {
 	static struct recovery_journal journal;
-
-	journal_create(&journal.base, recovery_journal_write, NULL);
+	journal_create(&journal.base, journal_no_write_async,
+		       journal_no_write_async_cb,
+		       recovery_journal_write, NULL);
 	journal.vclock = v;
 	journal_set(&journal.base);
 }
@@ -2353,7 +2353,7 @@ box_cfg_xc(void)
 
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
-	if (wal_init(wal_mode, cfg_gets("wal_dir"),
+	if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
 		     wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
 		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
diff --git a/src/box/journal.c b/src/box/journal.c
index 11e78990d..6c4a0850a 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -32,6 +32,24 @@
 #include <small/region.h>
 #include <diag.h>
 
+int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry)
+{
+	(void)journal;
+
+	say_error("journal: write_async invalid context");
+	entry->res = -1;
+	return -1;
+}
+
+void
+journal_no_write_async_cb(struct journal_entry *entry)
+{
+	say_error("journal: write_async_cb invalid context");
+	entry->res = -1;
+}
+
 /**
  * Used to load from a memtx snapshot. LSN is not used,
  * but txn_commit() must work.
@@ -41,21 +59,20 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
 	entry->res = 0;
-	journal_entry_complete(entry);
 	return 0;
 }
 
 static struct journal dummy_journal = {
-	dummy_journal_write,
-	NULL,
+	.write_async	= journal_no_write_async,
+	.write_async_cb	= journal_no_write_async_cb,
+	.write		= dummy_journal_write,
 };
 
 struct journal *current_journal = &dummy_journal;
 
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
-		  journal_entry_complete_cb on_complete_cb,
-		  void *on_complete_cb_data)
+		  void *complete_data)
 {
 	struct journal_entry *entry;
 
@@ -68,11 +85,11 @@ journal_entry_new(size_t n_rows, struct region *region,
 		diag_set(OutOfMemory, size, "region", "struct journal_entry");
 		return NULL;
 	}
+
+	entry->complete_data = complete_data;
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
-	entry->on_complete_cb = on_complete_cb;
-	entry->on_complete_cb_data = on_complete_cb_data;
+
 	return entry;
 }
-
diff --git a/src/box/journal.h b/src/box/journal.h
index 64f167c6f..045302906 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -34,6 +34,7 @@
 #include <stdbool.h>
 #include "salad/stailq.h"
 #include "fiber.h"
+#include "txn.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -42,9 +43,6 @@ extern "C" {
 struct xrow_header;
 struct journal_entry;
 
-/** Journal entry finalization callback typedef. */
-typedef void (*journal_entry_complete_cb)(struct journal_entry *entry, void *data);
-
 /**
  * An entry for an abstract journal.
  * Simply put, a write ahead log request.
@@ -60,17 +58,10 @@ struct journal_entry {
 	 * the committed transaction, on error is -1
 	 */
 	int64_t res;
-	/**
-	 * A journal entry finalization callback which is going to be called
-	 * after the entry processing was finished in both cases: success
-	 * or fail. Entry->res is set to a result value before the callback
-	 * is fired.
-	 */
-	journal_entry_complete_cb on_complete_cb;
 	/**
 	 * A journal entry completion callback argument.
 	 */
-	void *on_complete_cb_data;
+	void *complete_data;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -94,17 +85,7 @@ struct region;
  */
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
-		  journal_entry_complete_cb on_complete_cb,
-		  void *on_complete_cb_data);
-
-/**
- * Finalize a single entry.
- */
-static inline void
-journal_entry_complete(struct journal_entry *entry)
-{
-	entry->on_complete_cb(entry, entry->on_complete_cb_data);
-}
+		  void *complete_data);
 
 /**
  * An API for an abstract journal for all transactions of this
@@ -112,11 +93,31 @@ journal_entry_complete(struct journal_entry *entry)
  * synchronous replication.
  */
 struct journal {
+	/** Asynchronous write */
+	int (*write_async)(struct journal *journal,
+			   struct journal_entry *entry);
+
+	/** Asynchronous write completion */
+	void (*write_async_cb)(struct journal_entry *entry);
+
+	/** Synchronous write */
 	int (*write)(struct journal *journal,
-		     struct journal_entry *req);
+		     struct journal_entry *entry);
+
+	/** Journal destroy */
 	void (*destroy)(struct journal *journal);
 };
 
+/**
+ * Finalize a single entry.
+ */
+static inline void
+journal_async_complete(struct journal *journal, struct journal_entry *entry)
+{
+	assert(journal->write_async_cb != NULL);
+	journal->write_async_cb(entry);
+}
+
 /**
  * Depending on the step of recovery and instance configuration
  * points at a concrete implementation of the journal.
@@ -124,9 +125,9 @@ struct journal {
 extern struct journal *current_journal;
 
 /**
- * Send a single entry to write.
+ * Write a single entry to the journal in synchronous way.
  *
- * @return 0 if write was scheduled or -1 in case of an error.
+ * @return 0 if write was processed by a backend or -1 in case of an error.
  */
 static inline int
 journal_write(struct journal_entry *entry)
@@ -134,6 +135,17 @@ journal_write(struct journal_entry *entry)
 	return current_journal->write(current_journal, entry);
 }
 
+/**
+ * Queue a single entry to the journal in asynchronous way.
+ *
+ * @return 0 if write was queued to a backend or -1 in case of an error.
+ */
+static inline int
+journal_write_async(struct journal_entry *entry)
+{
+	return current_journal->write_async(current_journal, entry);
+}
+
 /**
  * Change the current implementation of the journaling API.
  * Happens during life cycle of an instance:
@@ -165,13 +177,30 @@ journal_set(struct journal *new_journal)
 
 static inline void
 journal_create(struct journal *journal,
-	       int (*write)(struct journal *, struct journal_entry *),
-	       void (*destroy)(struct journal *))
+	       int (*write_async)(struct journal *journal,
+				  struct journal_entry *entry),
+	       void (*write_async_cb)(struct journal_entry *entry),
+	       int (*write)(struct journal *journal,
+			    struct journal_entry *entry),
+	       void (*destroy)(struct journal *journal))
 {
-	journal->write = write;
-	journal->destroy = destroy;
+	journal->write_async	= write_async;
+	journal->write_async_cb	= write_async_cb;
+	journal->write		= write;
+	journal->destroy	= destroy;
 }
 
+/**
+ * A stub to issue an error in case if asynchronous
+ * write is diabled in the backend.
+ */
+extern int
+journal_no_write_async(struct journal *journal,
+		       struct journal_entry *entry);
+
+extern void
+journal_no_write_async_cb(struct journal_entry *entry);
+
 static inline bool
 journal_is_initialized(struct journal *journal)
 {
diff --git a/src/box/txn.c b/src/box/txn.c
index 11c20aceb..b42df3df6 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -463,9 +463,9 @@ txn_complete(struct txn *txn)
 }
 
 void
-txn_complete_async(struct journal_entry *entry, void *complete_data)
+txn_complete_async(struct journal_entry *entry)
 {
-	struct txn *txn = complete_data;
+	struct txn *txn = entry->complete_data;
 	txn->signature = entry->res;
 	/*
 	 * Some commit/rollback triggers require for in_txn fiber
@@ -487,7 +487,7 @@ txn_journal_entry_new(struct txn *txn)
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
 	req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows,
-				&txn->region, txn_complete_async, txn);
+				&txn->region, txn);
 	if (req == NULL)
 		return NULL;
 
@@ -518,24 +518,6 @@ txn_journal_entry_new(struct txn *txn)
 	return req;
 }
 
-static int64_t
-txn_write_to_wal(struct journal_entry *req)
-{
-	/*
-	 * Send the entry to the journal.
-	 *
-	 * After this point the transaction must not be used
-	 * so reset the corresponding key in the fiber storage.
-	 */
-	fiber_set_txn(fiber(), NULL);
-	if (journal_write(req) < 0) {
-		diag_set(ClientError, ER_WAL_IO);
-		diag_log();
-		return -1;
-	}
-	return 0;
-}
-
 /*
  * Prepare a transaction using engines.
  */
@@ -608,7 +590,17 @@ txn_commit_async(struct txn *txn)
 		return -1;
 	}
 
-	return txn_write_to_wal(req);
+	fiber_set_txn(fiber(), NULL);
+	if (journal_write_async(req) != 0) {
+		fiber_set_txn(fiber(), txn);
+		txn_rollback(txn);
+
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		return -1;
+	}
+
+	return 0;
 }
 
 int
@@ -636,21 +628,28 @@ txn_commit(struct txn *txn)
 		return -1;
 	}
 
-	if (txn_write_to_wal(req) != 0)
+	fiber_set_txn(fiber(), NULL);
+	if (journal_write(req) != 0) {
+		fiber_set_txn(fiber(), txn);
+		txn_rollback(txn);
+		txn_free(txn);
+
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
 		return -1;
+	}
 
-	/*
-	 * In case of non-yielding journal the transaction could already
-	 * be done and there is nothing to wait in such cases.
-	 */
 	if (!txn_has_flag(txn, TXN_IS_DONE)) {
-		bool cancellable = fiber_set_cancellable(false);
-		fiber_yield();
-		fiber_set_cancellable(cancellable);
+		txn->signature = req->res;
+		txn_complete(txn);
+		fiber_set_txn(fiber(), NULL);
 	}
+
 	int res = txn->signature >= 0 ? 0 : -1;
-	if (res != 0)
+	if (res != 0) {
 		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+	}
 
 	/* Synchronous transactions are freed by the calling fiber. */
 	txn_free(txn);
diff --git a/src/box/txn.h b/src/box/txn.h
index 572c76d84..3f6d79d5c 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -292,7 +292,7 @@ txn_rollback(struct txn *txn);
  * Complete asynchronous transaction.
  */
 void
-txn_complete_async(struct journal_entry *entry, void *complete_data);
+txn_complete_async(struct journal_entry *entry);
 
 /**
  * Submit a transaction to the journal.
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index cb291f3c8..9ead066af 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -815,8 +815,9 @@ vy_log_tx_flush(struct vy_log_tx *tx)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc,
-							NULL, NULL);
+
+	struct journal_entry *entry;
+	entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 1668c9348..3b094b0e8 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -60,11 +60,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
 int wal_dir_lock = -1;
 
+static int
+wal_write_async(struct journal *, struct journal_entry *);
+
 static int
 wal_write(struct journal *, struct journal_entry *);
 
 static int
-wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
+wal_write_none_async(struct journal *, struct journal_entry *);
+
+static int
+wal_write_none(struct journal *, struct journal_entry *);
 
 /*
  * WAL writer - maintain a Write Ahead Log for every change
@@ -253,9 +259,10 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 static void
 tx_schedule_queue(struct stailq *queue)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	struct journal_entry *req, *tmp;
 	stailq_foreach_entry_safe(req, tmp, queue, fifo)
-		journal_entry_complete(req);
+		journal_async_complete(&writer->base, req);
 }
 
 /**
@@ -342,6 +349,7 @@ tx_notify_checkpoint(struct cmsg *msg)
  */
 static void
 wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
+		  void (*wall_async_cb)(struct journal_entry *entry),
 		  const char *wal_dirname,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 		  wal_on_garbage_collection_f on_garbage_collection,
@@ -349,8 +357,14 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 {
 	writer->wal_mode = wal_mode;
 	writer->wal_max_size = wal_max_size;
-	journal_create(&writer->base, wal_mode == WAL_NONE ?
-		       wal_write_in_wal_mode_none : wal_write, NULL);
+
+	journal_create(&writer->base,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none_async : wal_write_async,
+		       wall_async_cb,
+		       wal_mode == WAL_NONE ?
+		       wal_write_none : wal_write,
+		       NULL);
 
 	struct xlog_opts opts = xlog_opts_default;
 	opts.sync_is_async = true;
@@ -458,14 +472,14 @@ wal_open(struct wal_writer *writer)
 }
 
 int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
-	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
+	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	/* Initialize the state. */
 	struct wal_writer *writer = &wal_writer_singleton;
-	wal_writer_create(writer, wal_mode, wal_dirname,
+	wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
 			  wal_max_size, instance_uuid, on_garbage_collection,
 			  on_checkpoint_threshold);
 
@@ -1170,7 +1184,7 @@ wal_writer_f(va_list ap)
  * to be written to disk.
  */
 static int
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_write_async(struct journal *journal, struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 
@@ -1221,26 +1235,49 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 
 fail:
 	entry->res = -1;
-	journal_entry_complete(entry);
 	return -1;
 }
 
 static int
-wal_write_in_wal_mode_none(struct journal *journal,
-			   struct journal_entry *entry)
+wal_write(struct journal *journal, struct journal_entry *entry)
+{
+	/*
+	 * We can reuse async WAL engine transparently
+	 * to the caller.
+	 */
+	if (wal_write_async(journal, entry) != 0)
+		return -1;
+
+	bool cancellable = fiber_set_cancellable(false);
+	fiber_yield();
+	fiber_set_cancellable(cancellable);
+
+	return 0;
+}
+
+static int
+wal_write_none_async(struct journal *journal,
+		     struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	struct vclock vclock_diff;
+
 	vclock_create(&vclock_diff);
 	wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows,
 		       entry->rows + entry->n_rows);
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
-	journal_entry_complete(entry);
+
 	return 0;
 }
 
+static int
+wal_write_none(struct journal *journal, struct journal_entry *entry)
+{
+	return wal_write_none_async(journal, entry);
+}
+
 void
 wal_init_vy_log()
 {
diff --git a/src/box/wal.h b/src/box/wal.h
index 76b44941a..11a66a20a 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
  * Start WAL thread and initialize WAL writer.
  */
 int
-wal_init(enum wal_mode wal_mode, const char *wal_dirname,
-	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
+	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
-- 
2.20.1

^ permalink raw reply	[flat|nested] 17+ messages in thread

end of thread, other threads:[~2020-03-19 16:20 UTC | newest]

Thread overview: 17+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-03-19  9:05 [Tarantool-patches] [PATCH v13 00/11] box/journal: redesign sync and async writes Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 01/11] box: recovery_journal_create -- set journal here Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 02/11] box: recovery_journal -- declare it as static Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 03/11] box/txn: move fiber_set_txn to header Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 04/11] box/txn: rename txn_write to txn_commit_async Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 05/11] box/txn: move setup of txn start to txn_prepare Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 06/11] box/txn: add txn_commit_nop helper Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 07/11] box/txn: rename txn_entry_complete_cb to txn_complete_async Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 08/11] box/txn: unweave txn_commit from txn_commit_async Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 09/11] box/txn: clear fiber storage right before journal write Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 10/11] box/txn: move journal allocation into separate routine Cyrill Gorcunov
2020-03-19  9:05 ` [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations Cyrill Gorcunov
2020-03-19 10:37   ` Konstantin Osipov
2020-03-19 10:49     ` Cyrill Gorcunov
2020-03-19 11:12       ` Konstantin Osipov
2020-03-19 11:17         ` Cyrill Gorcunov
2020-03-19 16:20         ` Cyrill Gorcunov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox