Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v3 00/14] Parallel applier
@ 2019-06-09 20:44 Georgy Kirichenko
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization Georgy Kirichenko
                   ` (13 more replies)
  0 siblings, 14 replies; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

This patch set introduces parallel applier processing on top of
asynchronous transaction feature with corresponding fixes essential
for the patch. 

This patch set is focused on three major options:
 1. To make a transaction and a fiber independent from one another.
 2. To make a transaction commit asynchronous.
 3. To allow applier write transaction in parallel.

Changes in v3:
 - Add a parallel applier implementation
 - Store transaction related data such as ddl trigger to a transaction
   memory region
 - Fix privileges defining
 - Minor fixes according to review

Changes in v2:
 - Three patches are merged to master
 - Replicated rows are also copied to txn region
 - A fiber to process tx_prio endpoint
 - Engine commit and triggers are changed in order
 - Minor fixes according to review


Georgy Kirichenko (14):
  txn: Fire a trigger after a transaction finalization
  ddl: synchronize privileges cache with actual data state.
  txn: transaction memory allocation
  ddl: place alter structures onto a txn memory region
  txn: get rid of autocommit from a txn structure
  txn: get rid of fiber_gc from txn_rollback
  wal: remove fiber from a journal_entry structure
  wal: enable asyncronous wal writes
  wal: a dedicated wal scheduling fiber
  core: latch_unlock_external routine
  txn: introduce asynchronous txn commit
  txn: handle fiber stop event at transaction level
  applier: apply transaction in parallel
  test: fix flaky test

 src/box/alter.cc                      |  47 ++-
 src/box/applier.cc                    | 395 ++++++++++++++++++--------
 src/box/applier.h                     |   4 +
 src/box/box.cc                        | 131 +++++++--
 src/box/call.c                        |  22 +-
 src/box/index.cc                      |  10 +-
 src/box/journal.c                     |  34 ++-
 src/box/journal.h                     |  51 +++-
 src/box/memtx_engine.c                |  16 +-
 src/box/memtx_space.c                 |   8 +-
 src/box/sql.c                         |   2 +-
 src/box/txn.c                         | 294 +++++++++++--------
 src/box/txn.h                         |  45 ++-
 src/box/vinyl.c                       |  18 +-
 src/box/vy_log.c                      |   2 +-
 src/box/vy_scheduler.c                |  16 +-
 src/box/wal.c                         | 137 +++++++--
 src/lib/core/latch.h                  |  13 +-
 test/box/ddl.result                   |  37 +++
 test/box/ddl.test.lua                 |  15 +
 test/replication/sync.result          |   7 +-
 test/replication/sync.test.lua        |   4 +-
 test/replication/transaction.result   |  16 +-
 test/replication/transaction.test.lua |   7 +-
 24 files changed, 939 insertions(+), 392 deletions(-)

-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-09 21:59   ` [tarantool-patches] " Konstantin Osipov
  2019-06-11 11:42   ` [tarantool-patches] " Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 02/14] ddl: synchronize privileges cache with actual data state Georgy Kirichenko
                   ` (12 subsequent siblings)
  13 siblings, 2 replies; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Fire transaction trigger after a transaction finalization. This allows
to not to view the transaction dismissed changes in case of rollback.

Fixes: #4276
---
 src/box/txn.c         | 16 ++++++++--------
 test/box/ddl.result   | 37 +++++++++++++++++++++++++++++++++++++
 test/box/ddl.test.lua | 15 +++++++++++++++
 3 files changed, 60 insertions(+), 8 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index da749d7cc..1d8271e51 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -410,6 +410,12 @@ txn_commit(struct txn *txn)
 		if (txn->signature < 0)
 			goto fail;
 	}
+	/*
+	 * Engine can be NULL if transaction contains IPROTO_NOP
+	 * statements only.
+	 */
+	if (txn->engine != NULL)
+		engine_commit(txn->engine, txn);
 	/*
 	 * The transaction is in the binary log. No action below
 	 * may throw. In case an error has happened, there is
@@ -421,12 +427,6 @@ txn_commit(struct txn *txn)
 		unreachable();
 		panic("commit trigger failed");
 	}
-	/*
-	 * Engine can be NULL if transaction contains IPROTO_NOP
-	 * statements only.
-	 */
-	if (txn->engine != NULL)
-		engine_commit(txn->engine, txn);
 
 	struct txn_stmt *stmt;
 	stailq_foreach_entry(stmt, &txn->stmts, next)
@@ -458,6 +458,8 @@ txn_rollback()
 	struct txn *txn = in_txn();
 	if (txn == NULL)
 		return;
+	if (txn->engine)
+		engine_rollback(txn->engine, txn);
 	/* Rollback triggers must not throw. */
 	if (txn->has_triggers &&
 	    trigger_run(&txn->on_rollback, txn) != 0) {
@@ -465,8 +467,6 @@ txn_rollback()
 		unreachable();
 		panic("rollback trigger failed");
 	}
-	if (txn->engine)
-		engine_rollback(txn->engine, txn);
 
 	struct txn_stmt *stmt;
 	stailq_foreach_entry(stmt, &txn->stmts, next)
diff --git a/test/box/ddl.result b/test/box/ddl.result
index b995b1493..9c82ce2b2 100644
--- a/test/box/ddl.result
+++ b/test/box/ddl.result
@@ -1208,3 +1208,40 @@ _ = c:get()
 test_latch:drop() -- this is where everything stops
 ---
 ...
+-- gh-4276 - check grant privilege rollback
+_ = box.schema.user.create('testg')
+---
+...
+_ = box.schema.space.create('testg'):create_index('pk')
+---
+...
+box.error.injection.set('ERRINJ_WAL_IO', true)
+---
+- ok
+...
+-- the grant operation above fails and test hasn't any space test permissions
+box.schema.user.grant('testg', 'read,write', 'space', 'testg')
+---
+- error: Failed to write to disk
+...
+-- switch user and check they couldn't select
+box.session.su('testg')
+---
+...
+box.space.testg:select()
+---
+- error: Read access to space 'testg' is denied for user 'testg'
+...
+box.session.su('admin')
+---
+...
+box.error.injection.set('ERRINJ_WAL_IO', false)
+---
+- ok
+...
+box.schema.user.drop('testg')
+---
+...
+box.space.testg:drop()
+---
+...
diff --git a/test/box/ddl.test.lua b/test/box/ddl.test.lua
index 101bc6f9b..301f7e6c1 100644
--- a/test/box/ddl.test.lua
+++ b/test/box/ddl.test.lua
@@ -270,3 +270,18 @@ box.rollback()
 
 _ = c:get()
 test_latch:drop() -- this is where everything stops
+
+-- gh-4276 - check grant privilege rollback
+_ = box.schema.user.create('testg')
+_ = box.schema.space.create('testg'):create_index('pk')
+
+box.error.injection.set('ERRINJ_WAL_IO', true)
+-- the grant operation above fails and test hasn't any space test permissions
+box.schema.user.grant('testg', 'read,write', 'space', 'testg')
+-- switch user and check they couldn't select
+box.session.su('testg')
+box.space.testg:select()
+box.session.su('admin')
+box.error.injection.set('ERRINJ_WAL_IO', false)
+box.schema.user.drop('testg')
+box.space.testg:drop()
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 02/14] ddl: synchronize privileges cache with actual data state.
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-11 13:13   ` Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 03/14] txn: transaction memory allocation Georgy Kirichenko
                   ` (11 subsequent siblings)
  13 siblings, 1 reply; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

We tend to synchronize cached data with the actual data changes: apply
while on_replace and undo while on_rollback.
---
 src/box/alter.cc | 37 ++++++++++++++++---------------------
 1 file changed, 16 insertions(+), 21 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index ed9e55907..c4a1c52a9 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -2938,31 +2938,23 @@ grant_or_revoke(struct priv_def *priv)
 
 /** A trigger called on rollback of grant, or on commit of revoke. */
 static void
-revoke_priv(struct trigger * /* trigger */, void *event)
+revoke_priv(struct trigger *trigger, void *event)
 {
-	struct txn *txn = (struct txn *) event;
-	struct txn_stmt *stmt = txn_last_stmt(txn);
-	struct tuple *tuple = (stmt->new_tuple ?
-			       stmt->new_tuple : stmt->old_tuple);
+	(void) event;
+	struct tuple *tuple = (struct tuple *)trigger->data;
 	struct priv_def priv;
 	priv_def_create_from_tuple(&priv, tuple);
-	/*
-	 * Access to the object has been removed altogether so
-	 * there should be no grants at all. If only some grants
-	 * were removed, modify_priv trigger would have been
-	 * invoked.
-	 */
 	priv.access = 0;
 	grant_or_revoke(&priv);
 }
 
 /** A trigger called on rollback of grant, or on commit of revoke. */
 static void
-modify_priv(struct trigger * /* trigger */, void *event)
+modify_priv(struct trigger *trigger, void *event)
 {
-	struct txn_stmt *stmt = txn_last_stmt((struct txn *) event);
+	(void) event;
 	struct priv_def priv;
-	priv_def_create_from_tuple(&priv, stmt->new_tuple);
+	priv_def_create_from_tuple(&priv, (struct tuple *)trigger->data);
 	grant_or_revoke(&priv);
 }
 
@@ -2985,21 +2977,24 @@ on_replace_dd_priv(struct trigger * /* trigger */, void *event)
 		priv_def_check(&priv, PRIV_GRANT);
 		grant_or_revoke(&priv);
 		struct trigger *on_rollback =
-			txn_alter_trigger_new(revoke_priv, NULL);
+			txn_alter_trigger_new(revoke_priv, new_tuple);
 		txn_on_rollback(txn, on_rollback);
 	} else if (new_tuple == NULL) {                /* revoke */
 		assert(old_tuple);
 		priv_def_create_from_tuple(&priv, old_tuple);
 		priv_def_check(&priv, PRIV_REVOKE);
-		struct trigger *on_commit =
-			txn_alter_trigger_new(revoke_priv, NULL);
-		txn_on_commit(txn, on_commit);
+		priv.access = 0;
+		grant_or_revoke(&priv);
+		struct trigger *on_rollback =
+			txn_alter_trigger_new(modify_priv, old_tuple);
+		txn_on_rollback(txn, on_rollback);
 	} else {                                       /* modify */
 		priv_def_create_from_tuple(&priv, new_tuple);
 		priv_def_check(&priv, PRIV_GRANT);
-		struct trigger *on_commit =
-			txn_alter_trigger_new(modify_priv, NULL);
-		txn_on_commit(txn, on_commit);
+		grant_or_revoke(&priv);
+		struct trigger *on_rollback =
+			txn_alter_trigger_new(modify_priv, old_tuple);
+		txn_on_rollback(txn, on_rollback);
 	}
 }
 
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 03/14] txn: transaction memory allocation
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization Georgy Kirichenko
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 02/14] ddl: synchronize privileges cache with actual data state Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 04/14] ddl: place alter structures onto a txn memory region Georgy Kirichenko
                   ` (10 subsequent siblings)
  13 siblings, 0 replies; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Enable to allocate a chunk of memory on a transaction memory region.

Prerequisites: #1254
---
 src/box/txn.h | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/src/box/txn.h b/src/box/txn.h
index f4d861824..196a7dcc1 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -393,6 +393,15 @@ txn_vdbe()
 	return txn->psql_txn->vdbe;
 }
 
+/**
+ * Allocate a chunk of memory on a txn memory region.
+ */
+static inline void *
+txn_alloc(struct txn *txn, size_t size)
+{
+	return region_alloc(&txn->region, size);
+}
+
 /**
  * FFI bindings: do not throw exceptions, do not accept extra
  * arguments
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 04/14] ddl: place alter structures onto a txn memory region
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (2 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 03/14] txn: transaction memory allocation Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-11 14:14   ` Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 05/14] txn: get rid of autocommit from a txn structure Georgy Kirichenko
                   ` (9 subsequent siblings)
  13 siblings, 1 reply; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

As alter schema triggers lifecycle is bound with a transaction
so corresponding structures should be placed onto a txn memory
region instead of a fiber gc space.

Prerequisites: #1254
---
 src/box/alter.cc |  8 ++++----
 src/box/vinyl.c  | 10 ++++------
 2 files changed, 8 insertions(+), 10 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index c4a1c52a9..671209b51 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -668,7 +668,7 @@ public:
 
 	void *operator new(size_t size)
 	{
-		return region_aligned_calloc_xc(&fiber()->gc, size,
+		return region_aligned_calloc_xc(&in_txn()->region, size,
 						alignof(uint64_t));
 	}
 	void operator delete(void * /* ptr */) {}
@@ -682,7 +682,7 @@ static struct trigger *
 txn_alter_trigger_new(trigger_f run, void *data)
 {
 	struct trigger *trigger = (struct trigger *)
-		region_calloc_object_xc(&fiber()->gc, struct trigger);
+		txn_alloc(in_txn(), sizeof(struct trigger));
 	trigger->run = run;
 	trigger->data = data;
 	trigger->destroy = NULL;
@@ -718,7 +718,7 @@ static struct alter_space *
 alter_space_new(struct space *old_space)
 {
 	struct alter_space *alter =
-		region_calloc_object_xc(&fiber()->gc, struct alter_space);
+		region_calloc_object_xc(&in_txn()->region, struct alter_space);
 	rlist_create(&alter->ops);
 	alter->old_space = old_space;
 	alter->space_def = space_def_dup_xc(alter->old_space->def);
@@ -3246,7 +3246,7 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
 	struct tuple *new_tuple = stmt->new_tuple;
 
 	struct alter_sequence *alter =
-		region_calloc_object_xc(&fiber()->gc, struct alter_sequence);
+		region_calloc_object_xc(&txn->region, struct alter_sequence);
 
 	struct sequence_def *new_def = NULL;
 	auto def_guard = make_scoped_guard([=] { free(new_def); });
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 8286fed7c..e37631064 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -4618,19 +4618,17 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 		 * which will propagate the WAL row LSN to
 		 * the LSM tree.
 		 */
-		struct trigger *on_commit = region_alloc(&fiber()->gc,
-							 sizeof(*on_commit));
+		struct trigger *on_commit = txn_alloc(txn, sizeof(*on_commit));
 		if (on_commit == NULL) {
 			diag_set(OutOfMemory, sizeof(*on_commit),
-				 "region", "struct trigger");
+				 "txn region", "struct trigger");
 			rc = -1;
 			break;
 		}
-		struct trigger *on_rollback = region_alloc(&fiber()->gc,
-							   sizeof(*on_commit));
+		struct trigger *on_rollback = txn_alloc(txn, sizeof(*on_commit));
 		if (on_rollback == NULL) {
 			diag_set(OutOfMemory, sizeof(*on_commit),
-				 "region", "struct trigger");
+				 "txn region", "struct trigger");
 			rc = -1;
 			break;
 		}
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 05/14] txn: get rid of autocommit from a txn structure
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (3 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 04/14] ddl: place alter structures onto a txn memory region Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-13 14:11   ` Vladimir Davydov
  2019-06-16 16:14   ` Konstantin Osipov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
                   ` (8 subsequent siblings)
  13 siblings, 2 replies; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Move transaction auto start and auto commit behavior to the box level.
From now a transaction won't start and commit automatically without
txn_begin/txn_commit invocations. This is a part of a bigger transaction
refactoring in order to implement detachable transactions and a parallel
applier.

Prerequisites: #1254
---
 src/box/applier.cc     | 35 +++++++++++++---
 src/box/box.cc         | 94 +++++++++++++++++++++++++++++++++---------
 src/box/index.cc       | 10 ++---
 src/box/memtx_engine.c | 10 ++++-
 src/box/memtx_space.c  |  8 ++--
 src/box/sql.c          |  2 +-
 src/box/txn.c          | 46 +++++++--------------
 src/box/txn.h          | 16 +++----
 src/box/vinyl.c        | 12 ++----
 src/box/vy_scheduler.c |  6 +--
 10 files changed, 148 insertions(+), 91 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 373e1feb9..e3203a4c8 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -172,11 +172,26 @@ applier_writer_f(va_list ap)
 static int
 apply_initial_join_row(struct xrow_header *row)
 {
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
 	struct request request;
 	xrow_decode_dml(row, &request, dml_request_key_map(row->type));
-	struct space *space = space_cache_find_xc(request.space_id);
+	struct space *space = space_cache_find(request.space_id);
+	if (space == NULL)
+		goto rollback;
 	/* no access checks here - applier always works with admin privs */
-	return space_apply_initial_join_row(space, &request);
+	if (space_apply_initial_join_row(space, &request))
+		goto rollback;
+	int rc;
+	rc = txn_commit(txn);
+	if (rc < 0)
+		return -1;
+	fiber_gc();
+	return rc;
+rollback:
+	txn_rollback();
+	return -1;
 }
 
 /**
@@ -189,8 +204,8 @@ static int
 process_nop(struct request *request)
 {
 	assert(request->type == IPROTO_NOP);
-	struct txn *txn = txn_begin_stmt(NULL);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, NULL) == NULL)
 		return -1;
 	return txn_commit_stmt(txn, request);
 }
@@ -403,8 +418,16 @@ applier_join(struct applier *applier)
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
-			if (apply_row(&row) != 0)
+			struct txn *txn = txn_begin();
+			if (txn == NULL)
+				diag_raise();
+			if (apply_row(&row) != 0) {
+				txn_rollback();
+				diag_raise();
+			}
+			if (txn_commit(txn) != 0)
 				diag_raise();
+			fiber_gc();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -555,7 +578,7 @@ applier_apply_tx(struct stailq *rows)
 	 * conflict safely access failed xrow object and allocate
 	 * IPROTO_NOP on gc.
 	 */
-	struct txn *txn = txn_begin(false);
+	struct txn *txn = txn_begin();
 	struct applier_tx_row *item;
 	if (txn == NULL)
 		diag_raise();
diff --git a/src/box/box.cc b/src/box/box.cc
index 57419ee01..7f23716e5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -169,34 +169,62 @@ int
 box_process_rw(struct request *request, struct space *space,
 	       struct tuple **result)
 {
+	struct tuple *tuple = NULL;
+	struct txn *txn = in_txn();
+	bool is_autocommit = txn == NULL;
+	if (is_autocommit && (txn = txn_begin()) == NULL)
+		return -1;
 	assert(iproto_type_is_dml(request->type));
 	rmean_collect(rmean_box, request->type, 1);
 	if (access_check_space(space, PRIV_W) != 0)
-		return -1;
-	struct txn *txn = txn_begin_stmt(space);
-	if (txn == NULL)
-		return -1;
-	struct tuple *tuple;
+		goto fail;
+	if (txn_begin_stmt(txn, space) == NULL)
+		goto fail;
 	if (space_execute_dml(space, txn, request, &tuple) != 0) {
-		txn_rollback_stmt();
-		return -1;
+		txn_rollback_stmt(txn);
+		goto fail;
+	}
+	if (result != NULL)
+		*result = tuple;
+
+	if (result == NULL || tuple == NULL) {
+		if (txn_commit_stmt(txn, request) != 0)
+			goto fail;
+		if (is_autocommit) {
+			if (txn_commit(txn) != 0)
+				return -1;
+			fiber_gc();
+		}
+		return 0;
 	}
-	if (result == NULL)
-		return txn_commit_stmt(txn, request);
-	*result = tuple;
-	if (tuple == NULL)
-		return txn_commit_stmt(txn, request);
 	/*
 	 * Pin the tuple locally before the commit,
 	 * otherwise it may go away during yield in
 	 * when WAL is written in autocommit mode.
 	 */
 	tuple_ref(tuple);
-	int rc = txn_commit_stmt(txn, request);
-	if (rc == 0)
-		tuple_bless(tuple);
+
+	if (txn_commit_stmt(txn, request)) {
+		/* Unref tuple and rollback if autocommit. */
+		tuple_unref(tuple);
+		goto fail;
+	}
+	if (is_autocommit) {
+		if (txn_commit(txn) != 0) {
+			/* Unref tuple and exit. */
+			tuple_unref(tuple);
+			return -1;
+		}
+	        fiber_gc();
+	}
+	tuple_bless(tuple);
 	tuple_unref(tuple);
-	return rc;
+	return 0;
+
+fail:
+	if (is_autocommit)
+		txn_rollback();
+	return -1;
 }
 
 void
@@ -299,10 +327,20 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
 		struct space *space = space_cache_find_xc(request.space_id);
+		struct txn *txn = txn_begin();
+		if (txn == NULL) {
+			say_error("error applying row: %s", request_str(&request));
+			diag_raise();
+		}
 		if (box_process_rw(&request, space, NULL) != 0) {
 			say_error("error applying row: %s", request_str(&request));
+			txn_rollback();
 			diag_raise();
 		}
+		if (txn_commit(txn) != 0) {
+			diag_raise();
+		}
+		fiber_gc();
 	}
 	struct wal_stream *xstream =
 		container_of(stream, struct wal_stream, base);
@@ -1055,7 +1093,7 @@ box_select(uint32_t space_id, uint32_t index_id,
 	struct iterator *it = index_create_iterator(index, type,
 						    key, part_count);
 	if (it == NULL) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 
@@ -1080,7 +1118,7 @@ box_select(uint32_t space_id, uint32_t index_id,
 
 	if (rc != 0) {
 		port_destroy(port);
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -1313,9 +1351,17 @@ box_sequence_reset(uint32_t seq_id)
 static inline void
 box_register_replica(uint32_t id, const struct tt_uuid *uuid)
 {
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		diag_raise();
 	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
-		 (unsigned) id, tt_uuid_str(uuid)) != 0)
+		 (unsigned) id, tt_uuid_str(uuid)) != 0) {
+		txn_rollback();
 		diag_raise();
+	}
+	if (txn_commit(txn) != 0)
+		diag_raise();
+	fiber_gc();
 	assert(replica_by_uuid(uuid)->id == id);
 }
 
@@ -1636,10 +1682,18 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
 		uu = *replicaset_uuid;
 	else
 		tt_uuid_create(&uu);
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		diag_raise();
 	/* Save replica set UUID in _schema */
 	if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
-		 tt_uuid_str(&uu)))
+		 tt_uuid_str(&uu))) {
+		txn_rollback();
+		diag_raise();
+	}
+	if (txn_commit(txn) != 0)
 		diag_raise();
+	fiber_gc();
 }
 
 void
diff --git a/src/box/index.cc b/src/box/index.cc
index 4a444e5d0..7f26c9bc2 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -240,7 +240,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_get(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -274,7 +274,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_min(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -306,7 +306,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_max(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -340,7 +340,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
 		return -1;
 	ssize_t count = index_count(index, itype, key, part_count);
 	if (count < 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -377,7 +377,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
 	struct iterator *it = index_create_iterator(index, itype,
 						    key, part_count);
 	if (it == NULL) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return NULL;
 	}
 	txn_commit_ro_stmt(txn);
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index f4312484a..149215b87 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -272,16 +272,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 		diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
 		return -1;
 	}
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
 	/* no access checks here - applier always works with admin privs */
-	if (space_apply_initial_join_row(space, &request) != 0)
+	if (space_apply_initial_join_row(space, &request) != 0) {
+		txn_rollback();
 		return -1;
+	}
+	int rc = txn_commit(txn);
 	/*
 	 * Don't let gc pool grow too much. Yet to
 	 * it before reading the next row, to make
 	 * sure it's not freed along here.
 	 */
 	fiber_gc();
-	return 0;
+	return rc;
 }
 
 /** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 1d209033c..1e630783b 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
 		return -1;
 	}
 	request->header->replica_id = 0;
-	struct txn *txn = txn_begin_stmt(space);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	struct txn_stmt *stmt = txn_begin_stmt(txn, space);
+	if (stmt == NULL)
 		return -1;
-	struct txn_stmt *stmt = txn_current_stmt(txn);
 	stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
 					  request->tuple_end);
 	if (stmt->new_tuple == NULL)
@@ -326,7 +326,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
 
 rollback:
 	say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return -1;
 }
 
diff --git a/src/box/sql.c b/src/box/sql.c
index fbfa59992..1bdbdb815 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -915,7 +915,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
 				      part_count);
 	if (it == NULL) {
 		if (txn != NULL)
-			txn_rollback_stmt();
+			txn_rollback_stmt(txn);
 		pCur->eState = CURSOR_INVALID;
 		return SQL_TARANTOOL_ITERATOR_FAIL;
 	}
diff --git a/src/box/txn.c b/src/box/txn.c
index 1d8271e51..21f34e526 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -174,7 +174,7 @@ txn_free(struct txn *txn)
 }
 
 struct txn *
-txn_begin(bool is_autocommit)
+txn_begin()
 {
 	static int64_t tsn = 0;
 	assert(! in_txn());
@@ -187,7 +187,6 @@ txn_begin(bool is_autocommit)
 	txn->n_new_rows = 0;
 	txn->n_local_rows = 0;
 	txn->n_applier_rows = 0;
-	txn->is_autocommit = is_autocommit;
 	txn->has_triggers  = false;
 	txn->is_aborted = false;
 	txn->in_sub_stmt = 0;
@@ -220,26 +219,20 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
 	return 0;
 }
 
-struct txn *
-txn_begin_stmt(struct space *space)
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space)
 {
-	struct txn *txn = in_txn();
-	if (txn == NULL) {
-		txn = txn_begin(true);
-		if (txn == NULL)
-			return NULL;
-	} else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+	assert(txn == in_txn());
+	assert(txn != NULL);
+	if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
 		diag_set(ClientError, ER_SUB_STMT_MAX);
 		return NULL;
 	}
 	struct txn_stmt *stmt = txn_stmt_new(txn);
-	if (stmt == NULL) {
-		if (txn->is_autocommit && txn->in_sub_stmt == 0)
-			txn_rollback();
+	if (stmt == NULL)
 		return NULL;
-	}
 	if (space == NULL)
-		return txn;
+		return stmt;
 
 	if (trigger_run(&space->on_stmt_begin, txn) != 0)
 		goto fail;
@@ -252,9 +245,9 @@ txn_begin_stmt(struct space *space)
 	if (engine_begin_statement(engine, txn) != 0)
 		goto fail;
 
-	return txn;
+	return stmt;
 fail:
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return NULL;
 }
 
@@ -272,8 +265,7 @@ txn_is_distributed(struct txn *txn)
 }
 
 /**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * End a statement.
  */
 int
 txn_commit_stmt(struct txn *txn, struct request *request)
@@ -316,14 +308,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 			goto fail;
 	}
 	--txn->in_sub_stmt;
-	if (txn->is_autocommit && txn->in_sub_stmt == 0) {
-		int rc = txn_commit(txn);
-		fiber_gc();
-		return rc;
-	}
 	return 0;
 fail:
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return -1;
 }
 
@@ -359,7 +346,7 @@ txn_write_to_wal(struct txn *txn)
 
 	if (res < 0) {
 		/* Cascading rollback. */
-		txn_rollback(); /* Perform our part of cascading rollback. */
+		txn_rollback(txn); /* Perform our part of cascading rollback. */
 		/*
 		 * Move fiber to end of event loop to avoid
 		 * execution of any new requests before all
@@ -441,14 +428,11 @@ fail:
 }
 
 void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
 {
-	struct txn *txn = in_txn();
 	if (txn == NULL || txn->in_sub_stmt == 0)
 		return;
 	txn->in_sub_stmt--;
-	if (txn->is_autocommit && txn->in_sub_stmt == 0)
-		return txn_rollback();
 	txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]);
 }
 
@@ -519,7 +503,7 @@ box_txn_begin()
 		diag_set(ClientError, ER_ACTIVE_TRANSACTION);
 		return -1;
 	}
-	if (txn_begin(false) == NULL)
+	if (txn_begin() == NULL)
 		return -1;
 	return 0;
 }
diff --git a/src/box/txn.h b/src/box/txn.h
index 196a7dcc1..ff2018d57 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -162,11 +162,6 @@ struct txn {
 	 * already assigned LSN.
 	 */
 	int n_applier_rows;
-	/**
-	 * True if this transaction is running in autocommit mode
-	 * (statement end causes an automatic transaction commit).
-	 */
-	bool is_autocommit;
 	/**
 	 * True if the transaction was aborted so should be
 	 * rolled back at commit.
@@ -214,7 +209,7 @@ in_txn()
  * @pre no transaction is active
  */
 struct txn *
-txn_begin(bool is_autocommit);
+txn_begin();
 
 /**
  * Commit a transaction.
@@ -271,11 +266,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
 }
 
 /**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
  */
-struct txn *
-txn_begin_stmt(struct space *space);
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space);
 
 int
 txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -334,7 +328,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
  * rolls back the entire transaction.
  */
 void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
 
 /**
  * Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index e37631064..c61f9477e 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2414,10 +2414,8 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn)
 	txn->engine_tx = vy_tx_begin(env->xm);
 	if (txn->engine_tx == NULL)
 		return -1;
-	if (!txn->is_autocommit) {
-		trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
-		trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
-	}
+	trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
+	trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
 	return 0;
 }
 
@@ -2487,8 +2485,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
 	vy_regulator_check_dump_watermark(&env->regulator);
 
 	txn->engine_tx = NULL;
-	if (!txn->is_autocommit)
-		trigger_clear(&txn->fiber_on_stop);
+	trigger_clear(&txn->fiber_on_stop);
 }
 
 static void
@@ -2502,8 +2499,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
 	vy_tx_rollback(tx);
 
 	txn->engine_tx = NULL;
-	if (!txn->is_autocommit)
-		trigger_clear(&txn->fiber_on_stop);
+	trigger_clear(&txn->fiber_on_stop);
 }
 
 static int
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index fabb4bb48..9d5f18e32 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
 
 	tuple_unref(delete);
 
-	struct txn *txn = txn_begin_stmt(deferred_delete_space);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, deferred_delete_space) == NULL)
 		return -1;
 
 	struct tuple *unused;
 	if (space_execute_dml(deferred_delete_space, txn,
 			      &request, &unused) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	return txn_commit_stmt(txn, &request);
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (4 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 05/14] txn: get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-13 14:12   ` Vladimir Davydov
  2019-06-16 16:38   ` [tarantool-patches] " Konstantin Osipov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure Georgy Kirichenko
                   ` (7 subsequent siblings)
  13 siblings, 2 replies; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Don't touch a fiber gc storage on a transaction rollback explicitly.
This relaxes dependencies between fiber and transaction life cycles.

Prerequisites: #1254
---
 src/box/applier.cc     |  8 +++++---
 src/box/box.cc         | 14 +++++++++-----
 src/box/call.c         | 22 ++++++++++++++++------
 src/box/memtx_engine.c |  3 ++-
 src/box/txn.c          | 35 +++++++++++++++++------------------
 src/box/txn.h          |  8 ++++++--
 src/box/vy_scheduler.c | 10 +++++++---
 7 files changed, 62 insertions(+), 38 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index e3203a4c8..5a92f6109 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -190,7 +190,7 @@ apply_initial_join_row(struct xrow_header *row)
 	fiber_gc();
 	return rc;
 rollback:
-	txn_rollback();
+	txn_rollback(txn);
 	return -1;
 }
 
@@ -422,7 +422,8 @@ applier_join(struct applier *applier)
 			if (txn == NULL)
 				diag_raise();
 			if (apply_row(&row) != 0) {
-				txn_rollback();
+				txn_rollback(txn);
+				fiber_gc();
 				diag_raise();
 			}
 			if (txn_commit(txn) != 0)
@@ -625,7 +626,8 @@ applier_apply_tx(struct stailq *rows)
 	return txn_commit(txn);
 
 rollback:
-	txn_rollback();
+	txn_rollback(txn);
+	fiber_gc();
 	return -1;
 }
 
diff --git a/src/box/box.cc b/src/box/box.cc
index 7f23716e5..5e5cd2b08 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -222,8 +222,10 @@ box_process_rw(struct request *request, struct space *space,
 	return 0;
 
 fail:
-	if (is_autocommit)
-		txn_rollback();
+	if (is_autocommit) {
+		txn_rollback(txn);
+		fiber_gc();
+	}
 	return -1;
 }
 
@@ -334,7 +336,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 		}
 		if (box_process_rw(&request, space, NULL) != 0) {
 			say_error("error applying row: %s", request_str(&request));
-			txn_rollback();
+			txn_rollback(txn);
 			diag_raise();
 		}
 		if (txn_commit(txn) != 0) {
@@ -1356,7 +1358,8 @@ box_register_replica(uint32_t id, const struct tt_uuid *uuid)
 		diag_raise();
 	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
 		 (unsigned) id, tt_uuid_str(uuid)) != 0) {
-		txn_rollback();
+		txn_rollback(txn);
+		fiber_gc();
 		diag_raise();
 	}
 	if (txn_commit(txn) != 0)
@@ -1688,7 +1691,8 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
 	/* Save replica set UUID in _schema */
 	if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
 		 tt_uuid_str(&uu))) {
-		txn_rollback();
+		txn_rollback(txn);
+		fiber_gc();
 		diag_raise();
 	}
 	if (txn_commit(txn) != 0)
diff --git a/src/box/call.c b/src/box/call.c
index 56da53fb3..7f6fc8bba 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -208,14 +208,18 @@ box_process_call(struct call_request *request, struct port *port)
 	if (orig_credentials)
 		fiber_set_user(fiber(), orig_credentials);
 
+	struct txn *txn = in_txn();
 	if (rc != 0) {
-		txn_rollback();
+		if (txn != NULL)
+			txn_rollback(txn);
+		fiber_gc();
 		return -1;
 	}
 
-	if (in_txn()) {
+	if (txn != NULL) {
 		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
-		txn_rollback();
+		txn_rollback(txn);
+		fiber_gc();
 		return -1;
 	}
 
@@ -229,14 +233,20 @@ box_process_eval(struct call_request *request, struct port *port)
 	/* Check permissions */
 	if (access_check_universe(PRIV_X) != 0)
 		return -1;
+	struct txn *txn;
 	if (box_lua_eval(request, port) != 0) {
-		txn_rollback();
+		txn = in_txn();
+		if (txn != NULL)
+			txn_rollback(txn);
+		fiber_gc();
 		return -1;
 	}
 
-	if (in_txn()) {
+	txn = in_txn();
+	if (txn != NULL) {
 		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
-		txn_rollback();
+		txn_rollback(txn);
+		fiber_gc();
 		return -1;
 	}
 
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 149215b87..918885318 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -277,7 +277,8 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 		return -1;
 	/* no access checks here - applier always works with admin privs */
 	if (space_apply_initial_join_row(space, &request) != 0) {
-		txn_rollback();
+		txn_rollback(txn);
+		fiber_gc();
 		return -1;
 	}
 	int rc = txn_commit(txn);
diff --git a/src/box/txn.c b/src/box/txn.c
index 21f34e526..a08652af1 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -168,6 +168,10 @@ txn_new()
 inline static void
 txn_free(struct txn *txn)
 {
+	struct txn_stmt *stmt;
+	stailq_foreach_entry(stmt, &txn->stmts, next)
+		txn_stmt_unref_tuples(stmt);
+
 	/* Truncate region up to struct txn size. */
 	region_truncate(&txn->region, sizeof(struct txn));
 	stailq_add(&txn_cache, &txn->in_txn_cache);
@@ -322,8 +326,10 @@ txn_write_to_wal(struct txn *txn)
 	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
 						      txn->n_applier_rows,
 						      &txn->region);
-	if (req == NULL)
+	if (req == NULL) {
+		txn_rollback(txn);
 		return -1;
+	}
 
 	struct txn_stmt *stmt;
 	struct xrow_header **remote_row = req->rows;
@@ -395,7 +401,7 @@ txn_commit(struct txn *txn)
 	if (txn->n_new_rows + txn->n_applier_rows > 0) {
 		txn->signature = txn_write_to_wal(txn);
 		if (txn->signature < 0)
-			goto fail;
+			return -1;
 	}
 	/*
 	 * Engine can be NULL if transaction contains IPROTO_NOP
@@ -415,15 +421,12 @@ txn_commit(struct txn *txn)
 		panic("commit trigger failed");
 	}
 
-	struct txn_stmt *stmt;
-	stailq_foreach_entry(stmt, &txn->stmts, next)
-		txn_stmt_unref_tuples(stmt);
 
 	fiber_set_txn(fiber(), NULL);
 	txn_free(txn);
 	return 0;
 fail:
-	txn_rollback();
+	txn_rollback(txn);
 	return -1;
 }
 
@@ -437,11 +440,9 @@ txn_rollback_stmt(struct txn *txn)
 }
 
 void
-txn_rollback()
+txn_rollback(struct txn *txn)
 {
-	struct txn *txn = in_txn();
-	if (txn == NULL)
-		return;
+	assert(txn == in_txn());
 	if (txn->engine)
 		engine_rollback(txn->engine, txn);
 	/* Rollback triggers must not throw. */
@@ -452,12 +453,6 @@ txn_rollback()
 		panic("rollback trigger failed");
 	}
 
-	struct txn_stmt *stmt;
-	stailq_foreach_entry(stmt, &txn->stmts, next)
-		txn_stmt_unref_tuples(stmt);
-
-	/** Free volatile txn memory. */
-	fiber_gc();
 	fiber_set_txn(fiber(), NULL);
 	txn_free(txn);
 }
@@ -533,11 +528,14 @@ int
 box_txn_rollback()
 {
 	struct txn *txn = in_txn();
+	if (txn == NULL)
+		return 0;
 	if (txn && txn->in_sub_stmt) {
 		diag_set(ClientError, ER_ROLLBACK_IN_SUB_STMT);
 		return -1;
 	}
-	txn_rollback(); /* doesn't throw */
+	txn_rollback(txn); /* doesn't throw */
+	fiber_gc();
 	return 0;
 }
 
@@ -615,6 +613,7 @@ txn_on_stop(struct trigger *trigger, void *event)
 {
 	(void) trigger;
 	(void) event;
-	txn_rollback();                 /* doesn't yield or fail */
+	txn_rollback(in_txn());                 /* doesn't yield or fail */
+
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index ff2018d57..d211e5012 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -221,9 +221,12 @@ txn_begin();
 int
 txn_commit(struct txn *txn);
 
-/** Rollback a transaction, if any. */
+/**
+ * Rollback a transaction.
+ * @pre txn == in_txn()
+ */
 void
-txn_rollback();
+txn_rollback(struct txn *txn);
 
 /**
  * Roll back the transaction but keep the object around.
@@ -267,6 +270,7 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
 
 /**
  * Start a new statement.
+ * Return a new statement or NULL in case of error.
  */
 struct txn_stmt *
 txn_begin_stmt(struct txn *txn, struct space *space);
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 9d5f18e32..7e34ed8fc 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -889,18 +889,22 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
 	for (int i = 0; i < batch->count; i++) {
 		if (vy_deferred_delete_process_one(deferred_delete_space,
 						   pk->space_id, pk->mem_format,
-						   &batch->stmt[i]) != 0)
-			goto fail;
+						   &batch->stmt[i]) != 0) {
+			goto fail_rollback;
+		}
 	}
 
 	if (txn_commit(txn) != 0)
 		goto fail;
 	fiber_gc();
 	return;
+
+fail_rollback:
+	txn_rollback(txn);
 fail:
 	batch->is_failed = true;
 	diag_move(diag_get(), &batch->diag);
-	txn_rollback();
+	fiber_gc();
 }
 
 /**
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (5 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-13 14:17   ` Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 08/14] wal: enable asyncronous wal writes Georgy Kirichenko
                   ` (6 subsequent siblings)
  13 siblings, 1 reply; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Use a fiber_cond to signal a condition and wake up a waiting fiber.
This relaxes friction between fiber and transaction life cycles.

Prerequisites: #1254
---
 src/box/box.cc    |  4 +++-
 src/box/journal.c |  7 ++++---
 src/box/journal.h |  9 +++++++--
 src/box/wal.c     | 29 +++++++++--------------------
 4 files changed, 23 insertions(+), 26 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 5e5cd2b08..a88e762c0 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -309,9 +309,11 @@ struct recovery_journal {
  */
 static int64_t
 recovery_journal_write(struct journal *base,
-		       struct journal_entry * /* entry */)
+		       struct journal_entry *entry)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
+	entry->done = true;
+	fiber_cond_broadcast(&entry->done_cond);
 	return vclock_sum(journal->vclock);
 }
 
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..6406d6f01 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -30,7 +30,6 @@
  */
 #include "journal.h"
 #include <small/region.h>
-#include <fiber.h>
 #include <diag.h>
 
 /**
@@ -41,7 +40,8 @@ static int64_t
 dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
-	(void) entry;
+	entry->done = true;
+	fiber_cond_broadcast(&entry->done_cond);
 	return 0;
 }
 
@@ -69,7 +69,8 @@ journal_entry_new(size_t n_rows, struct region *region)
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
-	entry->fiber = fiber();
+	entry->done = false;
+	fiber_cond_create(&entry->done_cond);
 	return entry;
 }
 
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..618c68eb2 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -33,6 +33,7 @@
 #include <stdint.h>
 #include <stdbool.h>
 #include "salad/stailq.h"
+#include "fiber_cond.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -55,9 +56,13 @@ struct journal_entry {
 	 */
 	int64_t res;
 	/**
-	 * The fiber issuing the request.
+	 * Turns to true when entry is processed by wal.
 	 */
-	struct fiber *fiber;
+	bool done;
+	/**
+	 * Condition to broadcast when processing is done.
+	 */
+	struct fiber_cond done_cond;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
diff --git a/src/box/wal.c b/src/box/wal.c
index 0ea15a432..5951817d0 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -246,22 +246,16 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 }
 
 /**
- * Invoke fibers waiting for their journal_entry's to be
- * completed. The fibers are invoked in strict fifo order:
- * this ensures that, in case of rollback, requests are
- * rolled back in strict reverse order, producing
- * a consistent database state.
+ * Signal done condition.
  */
 static void
 tx_schedule_queue(struct stailq *queue)
 {
-	/*
-	 * fiber_wakeup() is faster than fiber_call() when there
-	 * are many ready fibers.
-	 */
 	struct journal_entry *req;
-	stailq_foreach_entry(req, queue, fifo)
-		fiber_wakeup(req->fiber);
+	stailq_foreach_entry(req, queue, fifo) {
+		req->done = true;
+		fiber_cond_broadcast(&req->done_cond);
+	}
 }
 
 /**
@@ -1172,15 +1166,10 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	batch->approx_len += entry->approx_len;
 	writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
 	cpipe_flush_input(&writer->wal_pipe);
-	/**
-	 * It's not safe to spuriously wakeup this fiber
-	 * since in that case it will ignore a possible
-	 * error from WAL writer and not roll back the
-	 * transaction.
-	 */
-	bool cancellable = fiber_set_cancellable(false);
-	fiber_yield(); /* Request was inserted. */
-	fiber_set_cancellable(cancellable);
+
+	while (!entry->done)
+		fiber_cond_wait(&entry->done_cond);
+
 	return entry->res;
 }
 
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 08/14] wal: enable asyncronous wal writes
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (6 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-13 14:21   ` Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber Georgy Kirichenko
                   ` (5 subsequent siblings)
  13 siblings, 1 reply; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Allow to send a journal entry to wal without to wait until the writing
was finished. Two methods were introduced:
 * async_write method emits an entry to be written, returns 0 if the
 entry was successfully scheduled;
 * async_wait method waits until writing was finished and returns a
 result of journal write.

Prerequisites: #1254
---
 src/box/box.cc    | 30 ++++++++++++++++++++---
 src/box/journal.c | 21 +++++++++++++++-
 src/box/journal.h | 30 +++++++++++++++++++++++
 src/box/wal.c     | 62 +++++++++++++++++++++++++++++++++++++++++------
 4 files changed, 130 insertions(+), 13 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index a88e762c0..d0616095b 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -308,19 +308,41 @@ struct recovery_journal {
  * min/max LSN of created LSM levels.
  */
 static int64_t
-recovery_journal_write(struct journal *base,
-		       struct journal_entry *entry)
+recovery_journal_async_write(struct journal *base,
+			     struct journal_entry *entry)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
+	entry->res = vclock_sum(journal->vclock);
 	entry->done = true;
 	fiber_cond_broadcast(&entry->done_cond);
-	return vclock_sum(journal->vclock);
+	return 0;
+}
+
+static int64_t
+recovery_journal_async_wait(struct journal *base,
+			    struct journal_entry *entry)
+{
+	(void) base;
+	assert(entry->done);
+	return entry->res;
+}
+
+static int64_t
+recovery_journal_write(struct journal *base,
+		       struct journal_entry *entry)
+{
+	if (recovery_journal_async_write(base, entry) == 0)
+		return recovery_journal_async_wait(base, entry);
+	return -1;
 }
 
 static inline void
 recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 {
-	journal_create(&journal->base, recovery_journal_write, NULL);
+	journal_create(&journal->base, recovery_journal_write,
+		       recovery_journal_async_write,
+		       recovery_journal_async_wait,
+		       NULL);
 	journal->vclock = v;
 }
 
diff --git a/src/box/journal.c b/src/box/journal.c
index 6406d6f01..b978e6752 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -37,16 +37,35 @@
  * but txn_commit() must work.
  */
 static int64_t
-dummy_journal_write(struct journal *journal, struct journal_entry *entry)
+dummy_async_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
+	entry->res = 0;
 	entry->done = true;
 	fiber_cond_broadcast(&entry->done_cond);
 	return 0;
 }
 
+static int64_t
+dummy_async_wait(struct journal *journal, struct journal_entry *entry)
+{
+	(void) journal;
+	assert(entry->done);
+	return entry->res;
+}
+
+static int64_t
+dummy_journal_write(struct journal *journal, struct journal_entry *entry)
+{
+	if (dummy_async_write(journal, entry) == 0)
+		return dummy_async_wait(journal, entry);
+	return -1;
+}
+
 static struct journal dummy_journal = {
 	dummy_journal_write,
+	dummy_async_write,
+	dummy_async_wait,
 	NULL,
 };
 
diff --git a/src/box/journal.h b/src/box/journal.h
index 618c68eb2..e7fe9154a 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -95,6 +95,10 @@ journal_entry_new(size_t n_rows, struct region *region);
 struct journal {
 	int64_t (*write)(struct journal *journal,
 			 struct journal_entry *req);
+	int64_t (*async_write)(struct journal *journal,
+			       struct journal_entry *req);
+	int64_t (*async_wait)(struct journal *journal,
+			      struct journal_entry *req);
 	void (*destroy)(struct journal *journal);
 };
 
@@ -116,6 +120,28 @@ journal_write(struct journal_entry *entry)
 	return current_journal->write(current_journal, entry);
 }
 
+/**
+ * Send a single entry to write.
+ *
+ * @return   0 if write was scheduled or -1 on error.
+ */
+static inline int64_t
+journal_async_write(struct journal_entry *entry)
+{
+	return current_journal->async_write(current_journal, entry);
+}
+
+/**
+ * Wait until entry processing finished.
+ * @return   a log sequence number (vclock signature) of the entry
+ *           or -1 on error.
+ */
+static inline int64_t
+journal_async_wait(struct journal_entry *entry)
+{
+	return current_journal->async_wait(current_journal, entry);
+}
+
 /**
  * Change the current implementation of the journaling API.
  * Happens during life cycle of an instance:
@@ -148,9 +174,13 @@ journal_set(struct journal *new_journal)
 static inline void
 journal_create(struct journal *journal,
 	       int64_t (*write)(struct journal *, struct journal_entry *),
+	       int64_t (*async_write)(struct journal *, struct journal_entry *),
+	       int64_t (*async_wait)(struct journal *, struct journal_entry *),
 	       void (*destroy)(struct journal *))
 {
 	journal->write = write;
+	journal->async_write = async_write,
+	journal->async_wait = async_wait,
 	journal->destroy = destroy;
 }
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 5951817d0..86d021896 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -63,9 +63,18 @@ int wal_dir_lock = -1;
 static int64_t
 wal_write(struct journal *, struct journal_entry *);
 
+static int64_t
+wal_async_write(struct journal *, struct journal_entry *);
+
+static int64_t
+wal_async_wait(struct journal *, struct journal_entry *);
+
 static int64_t
 wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
 
+static int64_t
+wal_async_write_in_wal_mode_none(struct journal *, struct journal_entry *);
+
 /*
  * WAL writer - maintain a Write Ahead Log for every change
  * in the data state.
@@ -351,7 +360,10 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	writer->wal_max_rows = wal_max_rows;
 	writer->wal_max_size = wal_max_size;
 	journal_create(&writer->base, wal_mode == WAL_NONE ?
-		       wal_write_in_wal_mode_none : wal_write, NULL);
+		       wal_write_in_wal_mode_none : wal_write,
+		       wal_mode == WAL_NONE ?
+		       wal_async_write_in_wal_mode_none: wal_async_write,
+		       wal_async_wait, NULL);
 
 	struct xlog_opts opts = xlog_opts_default;
 	opts.sync_is_async = true;
@@ -1118,10 +1130,10 @@ wal_writer_f(va_list ap)
 
 /**
  * WAL writer main entry point: queue a single request
- * to be written to disk and wait until this task is completed.
+ * to be written to disk.
  */
-int64_t
-wal_write(struct journal *journal, struct journal_entry *entry)
+static int64_t
+wal_async_write(struct journal *journal, struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 
@@ -1138,6 +1150,8 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		say_error("Aborting transaction %llu during "
 			  "cascading rollback",
 			  vclock_sum(&writer->vclock));
+		entry->done = true;
+		fiber_cond_broadcast(&entry->done_cond);
 		return -1;
 	}
 
@@ -1152,6 +1166,8 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		if (batch == NULL) {
 			diag_set(OutOfMemory, sizeof(struct wal_msg),
 				 "region", "struct wal_msg");
+			entry->done = true;
+			fiber_cond_broadcast(&entry->done_cond);
 			return -1;
 		}
 		wal_msg_create(batch);
@@ -1166,16 +1182,34 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	batch->approx_len += entry->approx_len;
 	writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
 	cpipe_flush_input(&writer->wal_pipe);
+	return 0;
+}
 
+static int64_t
+wal_async_wait(struct journal *journal, struct journal_entry *entry)
+{
+	(void) journal;
 	while (!entry->done)
 		fiber_cond_wait(&entry->done_cond);
 
 	return entry->res;
 }
 
-int64_t
-wal_write_in_wal_mode_none(struct journal *journal,
-			   struct journal_entry *entry)
+/**
+ * WAL writer main entry point: queue a single request
+ * to be written to disk and wait until this task is completed.
+ */
+static int64_t
+wal_write(struct journal *journal, struct journal_entry *entry)
+{
+	if (wal_async_write(journal, entry) != 0)
+		return -1;
+	return wal_async_wait(journal, entry);
+}
+
+static int64_t
+wal_async_write_in_wal_mode_none(struct journal *journal,
+				 struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	struct vclock vclock_diff;
@@ -1184,7 +1218,19 @@ wal_write_in_wal_mode_none(struct journal *journal,
 		       entry->rows + entry->n_rows);
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
-	return vclock_sum(&writer->vclock);
+	entry->res = vclock_sum(&writer->vclock);
+	entry->done = true;
+	fiber_cond_broadcast(&entry->done_cond);
+	return 0;
+}
+
+static int64_t
+wal_write_in_wal_mode_none(struct journal *journal,
+			   struct journal_entry *entry)
+{
+	if (wal_async_write_in_wal_mode_none(journal, entry) != 0)
+		return -1;
+	return wal_async_wait(journal, entry);
 }
 
 void
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (7 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 08/14] wal: enable asyncronous wal writes Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-13 14:24   ` Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine Georgy Kirichenko
                   ` (4 subsequent siblings)
  13 siblings, 1 reply; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

In order to implement asynchronous transaction we need to run a
transaction completion handler but tx_prio is not able to yield.

Prerequisites: #1254
---
 src/box/wal.c | 48 +++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 41 insertions(+), 7 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index 86d021896..e868a8e71 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -100,6 +100,12 @@ struct wal_writer
 	struct cpipe wal_pipe;
 	/** A memory pool for messages. */
 	struct mempool msg_pool;
+	/** A queue to schedule journal entry completions. */
+	struct stailq schedule_queue;
+	/** True if writer is in rollback state. */
+	bool is_in_rollback;
+	/** A condition to signal about new schedule queue entries. */
+	struct fiber_cond schedule_cond;
 	/* ----------------- wal ------------------- */
 	/** A setting from instance configuration - rows_per_wal */
 	int64_t wal_max_rows;
@@ -254,17 +260,36 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 	return xlog_tx_commit(l);
 }
 
+/*
+ * Tx schedule fiber function.
+ */
+static int
+tx_schedule_f(va_list ap)
+{
+	struct wal_writer *writer = va_arg(ap, struct wal_writer *);
+	while (!fiber_is_cancelled()) {
+		while (!stailq_empty(&writer->schedule_queue)) {
+			struct journal_entry *req =
+				stailq_shift_entry(&writer->schedule_queue,
+						   struct journal_entry, fifo);
+			req->done = true;
+			fiber_cond_broadcast(&req->done_cond);
+		}
+		writer->is_in_rollback = false;
+		fiber_cond_wait(&writer->schedule_cond);
+	}
+	return 0;
+}
+
 /**
- * Signal done condition.
+ * Attach requests to a scheduling queue.
  */
 static void
 tx_schedule_queue(struct stailq *queue)
 {
-	struct journal_entry *req;
-	stailq_foreach_entry(req, queue, fifo) {
-		req->done = true;
-		fiber_cond_broadcast(&req->done_cond);
-	}
+	struct wal_writer *writer = &wal_writer_singleton;
+	stailq_concat(&writer->schedule_queue, queue);
+	fiber_cond_signal(&writer->schedule_cond);
 }
 
 /**
@@ -309,6 +334,8 @@ tx_schedule_rollback(struct cmsg *msg)
 	/* Must not yield. */
 	tx_schedule_queue(&writer->rollback);
 	stailq_create(&writer->rollback);
+	writer->is_in_rollback = true;
+
 	if (msg != &writer->in_rollback)
 		mempool_free(&writer->msg_pool,
 			     container_of(msg, struct wal_msg, base));
@@ -359,6 +386,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	writer->wal_mode = wal_mode;
 	writer->wal_max_rows = wal_max_rows;
 	writer->wal_max_size = wal_max_size;
+	writer->is_in_rollback = false;
 	journal_create(&writer->base, wal_mode == WAL_NONE ?
 		       wal_write_in_wal_mode_none : wal_write,
 		       wal_mode == WAL_NONE ?
@@ -374,6 +402,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 
 	stailq_create(&writer->rollback);
 	cmsg_init(&writer->in_rollback, NULL);
+	stailq_create(&writer->schedule_queue);
+	fiber_cond_create(&writer->schedule_cond);
+	struct fiber *schedule_fiber = fiber_new("tx_schedule", tx_schedule_f);
+	if (schedule_fiber == NULL)
+		panic("Could not create schedule fiber");
+	fiber_start(schedule_fiber, writer);
 
 	writer->checkpoint_wal_size = 0;
 	writer->checkpoint_threshold = INT64_MAX;
@@ -1139,7 +1173,7 @@ wal_async_write(struct journal *journal, struct journal_entry *entry)
 
 	ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
 
-	if (! stailq_empty(&writer->rollback)) {
+	if (writer->is_in_rollback) {
 		/*
 		 * The writer rollback queue is not empty,
 		 * roll back this transaction immediately.
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (8 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-13 14:27   ` Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit Georgy Kirichenko
                   ` (3 subsequent siblings)
  13 siblings, 1 reply; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Allow unlock a latch from a fiber which isn't owner of the latch. This
is required to process transaction triggers asynchronously.

Prerequisites: #1254
---
 src/box/alter.cc     |  2 +-
 src/lib/core/latch.h | 13 +++++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index 671209b51..adf30c9b6 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3446,7 +3446,7 @@ unlock_after_dd(struct trigger *trigger, void *event)
 {
 	(void) trigger;
 	(void) event;
-	latch_unlock(&schema_lock);
+	latch_unlock_external(&schema_lock);
 }
 
 static void
diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h
index 49c59cf63..b27d6dd08 100644
--- a/src/lib/core/latch.h
+++ b/src/lib/core/latch.h
@@ -159,9 +159,8 @@ latch_trylock(struct latch *l)
  * \copydoc box_latch_unlock
  */
 static inline void
-latch_unlock(struct latch *l)
+latch_unlock_external(struct latch *l)
 {
-	assert(l->owner == fiber());
 	l->owner = NULL;
 	if (!rlist_empty(&l->queue)) {
 		struct fiber *f = rlist_first_entry(&l->queue,
@@ -176,6 +175,16 @@ latch_unlock(struct latch *l)
 	}
 }
 
+/**
+ * \copydoc box_latch_unlock
+ */
+static inline void
+latch_unlock(struct latch *l)
+{
+	assert(l->owner == fiber());
+	latch_unlock_external(l);
+}
+
 /** \cond public */
 
 /**
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (9 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-13 14:34   ` Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 12/14] txn: handle fiber stop event at transaction level Georgy Kirichenko
                   ` (2 subsequent siblings)
  13 siblings, 1 reply; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Allow asynchronous transaction commit. This adds two functions:
 * txn_write that sends a transaction to a journal
 * txn_wait that waits until the transaction processing was done

Prerequisites: #1254
---
 src/box/box.cc    |  24 ++---
 src/box/journal.c |  22 +++--
 src/box/journal.h |  40 +++++----
 src/box/txn.c     | 220 +++++++++++++++++++++++++++++++---------------
 src/box/txn.h     |  12 +++
 src/box/vy_log.c  |   2 +-
 src/box/wal.c     |   8 ++
 7 files changed, 221 insertions(+), 107 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index d0616095b..510f3fc99 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -308,19 +308,21 @@ struct recovery_journal {
  * min/max LSN of created LSM levels.
  */
 static int64_t
-recovery_journal_async_write(struct journal *base,
-			     struct journal_entry *entry)
+recovery_journal_write(struct journal *base,
+		       struct journal_entry *entry)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
 	entry->res = vclock_sum(journal->vclock);
+	if (entry->on_done_cb)
+		entry->on_done_cb(entry, entry->on_done_cb_data);
 	entry->done = true;
 	fiber_cond_broadcast(&entry->done_cond);
 	return 0;
 }
 
 static int64_t
-recovery_journal_async_wait(struct journal *base,
-			    struct journal_entry *entry)
+recovery_journal_wait(struct journal *base,
+		      struct journal_entry *entry)
 {
 	(void) base;
 	assert(entry->done);
@@ -328,20 +330,20 @@ recovery_journal_async_wait(struct journal *base,
 }
 
 static int64_t
-recovery_journal_write(struct journal *base,
-		       struct journal_entry *entry)
+recovery_journal_write_sync(struct journal *base,
+			    struct journal_entry *entry)
 {
-	if (recovery_journal_async_write(base, entry) == 0)
-		return recovery_journal_async_wait(base, entry);
+	if (recovery_journal_write(base, entry) == 0)
+		return recovery_journal_wait(base, entry);
 	return -1;
 }
 
 static inline void
 recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 {
-	journal_create(&journal->base, recovery_journal_write,
-		       recovery_journal_async_write,
-		       recovery_journal_async_wait,
+	journal_create(&journal->base, recovery_journal_write_sync,
+		       recovery_journal_write,
+		       recovery_journal_wait,
 		       NULL);
 	journal->vclock = v;
 }
diff --git a/src/box/journal.c b/src/box/journal.c
index b978e6752..dadff771e 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -37,17 +37,19 @@
  * but txn_commit() must work.
  */
 static int64_t
-dummy_async_write(struct journal *journal, struct journal_entry *entry)
+dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
 	entry->res = 0;
+	if (entry->on_done_cb)
+		entry->on_done_cb(entry, entry->on_done_cb_data);
 	entry->done = true;
 	fiber_cond_broadcast(&entry->done_cond);
 	return 0;
 }
 
 static int64_t
-dummy_async_wait(struct journal *journal, struct journal_entry *entry)
+dummy_journal_wait(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
 	assert(entry->done);
@@ -55,24 +57,26 @@ dummy_async_wait(struct journal *journal, struct journal_entry *entry)
 }
 
 static int64_t
-dummy_journal_write(struct journal *journal, struct journal_entry *entry)
+dummy_journal_write_sync(struct journal *journal, struct journal_entry *entry)
 {
-	if (dummy_async_write(journal, entry) == 0)
-		return dummy_async_wait(journal, entry);
+	if (dummy_journal_write(journal, entry) == 0)
+		return dummy_journal_wait(journal, entry);
 	return -1;
 }
 
 static struct journal dummy_journal = {
+	dummy_journal_write_sync,
 	dummy_journal_write,
-	dummy_async_write,
-	dummy_async_wait,
+	dummy_journal_wait,
 	NULL,
 };
 
 struct journal *current_journal = &dummy_journal;
 
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region)
+journal_entry_new(size_t n_rows, struct region *region,
+		  void (*on_done_cb)(struct journal_entry *entry, void *data),
+		  void *on_done_cb_data)
 {
 	struct journal_entry *entry;
 
@@ -90,6 +94,8 @@ journal_entry_new(size_t n_rows, struct region *region)
 	entry->res = -1;
 	entry->done = false;
 	fiber_cond_create(&entry->done_cond);
+	entry->on_done_cb = on_done_cb;
+	entry->on_done_cb_data = on_done_cb_data;
 	return entry;
 }
 
diff --git a/src/box/journal.h b/src/box/journal.h
index e7fe9154a..c7e467969 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -63,6 +63,14 @@ struct journal_entry {
 	 * Condition to broadcast when processing is done.
 	 */
 	struct fiber_cond done_cond;
+	/**
+	 * A journal entry completion callback.
+	 */
+	void (*on_done_cb)(struct journal_entry *entry, void *data);
+	/**
+	 * A journal entry completion callback argument.
+	 */
+	void *on_done_cb_data;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -85,7 +93,9 @@ struct region;
  * @return NULL if out of memory, fiber diagnostics area is set
  */
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region);
+journal_entry_new(size_t n_rows, struct region *region,
+		  void (*on_done_cb)(struct journal_entry *entry, void *data),
+		  void *on_done_cb_data);
 
 /**
  * An API for an abstract journal for all transactions of this
@@ -93,12 +103,12 @@ journal_entry_new(size_t n_rows, struct region *region);
  * synchronous replication.
  */
 struct journal {
+	int64_t (*write_sync)(struct journal *journal,
+			      struct journal_entry *req);
 	int64_t (*write)(struct journal *journal,
 			 struct journal_entry *req);
-	int64_t (*async_write)(struct journal *journal,
-			       struct journal_entry *req);
-	int64_t (*async_wait)(struct journal *journal,
-			      struct journal_entry *req);
+	int64_t (*wait)(struct journal *journal,
+			struct journal_entry *req);
 	void (*destroy)(struct journal *journal);
 };
 
@@ -115,9 +125,9 @@ extern struct journal *current_journal;
  *           or -1 on error.
  */
 static inline int64_t
-journal_write(struct journal_entry *entry)
+journal_write_sync(struct journal_entry *entry)
 {
-	return current_journal->write(current_journal, entry);
+	return current_journal->write_sync(current_journal, entry);
 }
 
 /**
@@ -126,9 +136,9 @@ journal_write(struct journal_entry *entry)
  * @return   0 if write was scheduled or -1 on error.
  */
 static inline int64_t
-journal_async_write(struct journal_entry *entry)
+journal_write(struct journal_entry *entry)
 {
-	return current_journal->async_write(current_journal, entry);
+	return current_journal->write(current_journal, entry);
 }
 
 /**
@@ -137,9 +147,9 @@ journal_async_write(struct journal_entry *entry)
  *           or -1 on error.
  */
 static inline int64_t
-journal_async_wait(struct journal_entry *entry)
+journal_wait(struct journal_entry *entry)
 {
-	return current_journal->async_wait(current_journal, entry);
+	return current_journal->wait(current_journal, entry);
 }
 
 /**
@@ -173,14 +183,14 @@ journal_set(struct journal *new_journal)
 
 static inline void
 journal_create(struct journal *journal,
+	       int64_t (*write_sync)(struct journal *, struct journal_entry *),
 	       int64_t (*write)(struct journal *, struct journal_entry *),
-	       int64_t (*async_write)(struct journal *, struct journal_entry *),
-	       int64_t (*async_wait)(struct journal *, struct journal_entry *),
+	       int64_t (*wait)(struct journal *, struct journal_entry *),
 	       void (*destroy)(struct journal *))
 {
+	journal->write_sync = write_sync;
 	journal->write = write;
-	journal->async_write = async_write,
-	journal->async_wait = async_wait,
+	journal->wait = wait;
 	journal->destroy = destroy;
 }
 
diff --git a/src/box/txn.c b/src/box/txn.c
index a08652af1..815d635fe 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -199,6 +199,7 @@ txn_begin()
 	txn->engine = NULL;
 	txn->engine_tx = NULL;
 	txn->psql_txn = NULL;
+	txn->entry = NULL;
 	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
 	fiber_set_txn(fiber(), txn);
 	return txn;
@@ -318,22 +319,77 @@ fail:
 	return -1;
 }
 
+/**
+ * Complete transaction processing.
+ */
+static void
+txn_complete(struct txn *txn)
+{
+	if (txn->signature < 0) {
+		if (txn->engine)
+			engine_rollback(txn->engine, txn);
+		/* Rollback triggers must not throw. */
+		fiber_set_txn(fiber(), txn);
+		if (txn->has_triggers &&
+		    trigger_run(&txn->on_rollback, txn) != 0) {
+			diag_log();
+			unreachable();
+			panic("rollback trigger failed");
+		}
+		fiber_set_txn(fiber(), NULL);
+
+		return;
+	}
+	/*
+	 * Engine can be NULL if transaction contains IPROTO_NOP
+	 * statements only.
+	 */
+	if (txn->engine != NULL)
+		engine_commit(txn->engine, txn);
+	/*
+	 * The transaction is in the binary log. No action below
+	 * may throw. In case an error has happened, there is
+	 * no other option but terminate.
+	 */
+	fiber_set_txn(fiber(), txn);
+	if (txn->has_triggers &&
+	    trigger_run(&txn->on_commit, txn) != 0) {
+		diag_log();
+		unreachable();
+		panic("commit trigger failed");
+	}
+
+	fiber_set_txn(fiber(), NULL);
+}
+
+static void
+txn_entry_done_cb(struct journal_entry *entry, void *data)
+{
+	struct txn *txn = (struct txn *)data;
+	assert(txn->entry == entry);
+	txn->signature = entry->res;
+	txn_complete(txn);
+}
+
 static int64_t
-txn_write_to_wal(struct txn *txn)
+txn_journal_write(struct txn *txn)
 {
+	assert(txn->entry == NULL);
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
-	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
-						      txn->n_applier_rows,
-						      &txn->region);
-	if (req == NULL) {
+	/* Prepare a journal entry. */
+	txn->entry = journal_entry_new(txn->n_new_rows +
+				       txn->n_applier_rows,
+				       &txn->region,
+				       txn_entry_done_cb, txn);
+	if (txn->entry == NULL) {
 		txn_rollback(txn);
 		return -1;
 	}
 
 	struct txn_stmt *stmt;
-	struct xrow_header **remote_row = req->rows;
-	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
+	struct xrow_header **remote_row = txn->entry->rows;
+	struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
@@ -341,42 +397,39 @@ txn_write_to_wal(struct txn *txn)
 			*local_row++ = stmt->row;
 		else
 			*remote_row++ = stmt->row;
-		req->approx_len += xrow_approx_len(stmt->row);
+		txn->entry->approx_len += xrow_approx_len(stmt->row);
 	}
-	assert(remote_row == req->rows + txn->n_applier_rows);
+	assert(remote_row == txn->entry->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
 
-	ev_tstamp start = ev_monotonic_now(loop());
-	int64_t res = journal_write(req);
-	ev_tstamp stop = ev_monotonic_now(loop());
-
-	if (res < 0) {
-		/* Cascading rollback. */
-		txn_rollback(txn); /* Perform our part of cascading rollback. */
-		/*
-		 * Move fiber to end of event loop to avoid
-		 * execution of any new requests before all
-		 * pending rollbacks are processed.
-		 */
-		fiber_reschedule();
+	/* Send entry to a journal. */
+	if (journal_write(txn->entry) < 0) {
 		diag_set(ClientError, ER_WAL_IO);
-		diag_log();
-	} else if (stop - start > too_long_threshold) {
-		int n_rows = txn->n_new_rows + txn->n_applier_rows;
-		say_warn_ratelimited("too long WAL write: %d rows at "
-				     "LSN %lld: %.3f sec", n_rows,
-				     res - n_rows + 1, stop - start);
+		return -1;
 	}
-	/*
-	 * Use vclock_sum() from WAL writer as transaction signature.
-	 */
-	return res;
+	return 0;
 }
 
-int
-txn_commit(struct txn *txn)
+/*
+ * Wait until journal processing finished.
+ */
+static int64_t
+txn_journal_wait(struct txn *txn)
+{
+	assert(txn->entry != NULL);
+	int64_t signature = journal_wait(txn->entry);
+	assert(signature == txn->signature);
+	if (signature < 0)
+		diag_set(ClientError, ER_WAL_IO);
+	return signature;
+}
+
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
 {
-	assert(txn == in_txn());
 	/*
 	 * If transaction has been started in SQL, deferred
 	 * foreign key constraints must not be violated.
@@ -386,7 +439,7 @@ txn_commit(struct txn *txn)
 		struct sql_txn *sql_txn = txn->psql_txn;
 		if (sql_txn->fk_deferred_count != 0) {
 			diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
-			goto fail;
+			return -1;
 		}
 	}
 	/*
@@ -394,42 +447,75 @@ txn_commit(struct txn *txn)
 	 * we have a bunch of IPROTO_NOP statements.
 	 */
 	if (txn->engine != NULL) {
-		if (engine_prepare(txn->engine, txn) != 0)
-			goto fail;
+		if (engine_prepare(txn->engine, txn) != 0) {
+			return -1;
+		}
 	}
+	return 0;
+}
 
-	if (txn->n_new_rows + txn->n_applier_rows > 0) {
-		txn->signature = txn_write_to_wal(txn);
-		if (txn->signature < 0)
-			return -1;
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_write(struct txn *txn)
+{
+	if (txn_prepare(txn) != 0)
+		goto fail;
+
+	txn->start_tm = ev_monotonic_now(loop());
+	if (txn->n_new_rows + txn->n_applier_rows == 0) {
+		/* Nothing to do. */
+		txn->signature = 0;
+		txn_complete(txn);
+		return 0;
 	}
-	/*
-	 * Engine can be NULL if transaction contains IPROTO_NOP
-	 * statements only.
-	 */
-	if (txn->engine != NULL)
-		engine_commit(txn->engine, txn);
-	/*
-	 * The transaction is in the binary log. No action below
-	 * may throw. In case an error has happened, there is
-	 * no other option but terminate.
-	 */
-	if (txn->has_triggers &&
-	    trigger_run(&txn->on_commit, txn) != 0) {
-		diag_log();
-		unreachable();
-		panic("commit trigger failed");
+
+	if (txn_journal_write(txn) != 0)
+		goto fail;
+	fiber_set_txn(fiber(), NULL);
+	return 0;
+fail:
+	txn_rollback(txn);
+	return -1;
+}
+
+/*
+ * Wait until transaction processing was finished.
+ */
+int
+txn_wait(struct txn *txn)
+{
+	if (txn->n_new_rows + txn->n_applier_rows > 0 &&
+	    txn_journal_wait(txn) < 0)
+		goto fail;
+	ev_tstamp stop_tm = ev_monotonic_now(loop());
+	if (stop_tm - txn->start_tm > too_long_threshold) {
+		int n_rows = txn->n_new_rows + txn->n_applier_rows;
+		say_warn_ratelimited("too long WAL write: %d rows at "
+				     "LSN %lld: %.3f sec", n_rows,
+				     txn->signature - n_rows + 1,
+				     stop_tm - txn->start_tm);
 	}
 
 
-	fiber_set_txn(fiber(), NULL);
 	txn_free(txn);
 	return 0;
+
 fail:
-	txn_rollback(txn);
+	txn_free(txn);
 	return -1;
 }
 
+int
+txn_commit(struct txn *txn)
+{
+	if (txn_write(txn) != 0 ||
+	    txn_wait(txn) < 0)
+		return -1;
+	return 0;
+}
+
 void
 txn_rollback_stmt(struct txn *txn)
 {
@@ -442,18 +528,8 @@ txn_rollback_stmt(struct txn *txn)
 void
 txn_rollback(struct txn *txn)
 {
-	assert(txn == in_txn());
-	if (txn->engine)
-		engine_rollback(txn->engine, txn);
-	/* Rollback triggers must not throw. */
-	if (txn->has_triggers &&
-	    trigger_run(&txn->on_rollback, txn) != 0) {
-		diag_log();
-		unreachable();
-		panic("rollback trigger failed");
-	}
-
-	fiber_set_txn(fiber(), NULL);
+	txn->signature = -1;
+	txn_complete(txn);
 	txn_free(txn);
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index d211e5012..84d4f27d3 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -195,6 +195,12 @@ struct txn {
 	 /** Commit and rollback triggers */
 	struct rlist on_commit, on_rollback;
 	struct sql_txn *psql_txn;
+	/** Journal entry to control txn write. */
+	struct journal_entry *entry;
+	/** Transaction completion trigger. */
+	struct trigger entry_done;
+	/** Timestampt of entry write start. */
+	ev_tstamp start_tm;
 };
 
 /* Pointer to the current transaction (if any) */
@@ -228,6 +234,12 @@ txn_commit(struct txn *txn);
 void
 txn_rollback(struct txn *txn);
 
+int
+txn_write(struct txn *txn);
+
+int
+txn_wait(struct txn *txn);
+
 /**
  * Roll back the transaction but keep the object around.
  * A special case for memtx transaction abort on yield. In this
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index bdc1cfa31..d37d44011 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -772,7 +772,7 @@ vy_log_flush(void)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
+	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index e868a8e71..eff48b4fe 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -272,6 +272,8 @@ tx_schedule_f(va_list ap)
 			struct journal_entry *req =
 				stailq_shift_entry(&writer->schedule_queue,
 						   struct journal_entry, fifo);
+			if (req->on_done_cb != NULL)
+				req->on_done_cb(req, req->on_done_cb_data);
 			req->done = true;
 			fiber_cond_broadcast(&req->done_cond);
 		}
@@ -1184,6 +1186,8 @@ wal_async_write(struct journal *journal, struct journal_entry *entry)
 		say_error("Aborting transaction %llu during "
 			  "cascading rollback",
 			  vclock_sum(&writer->vclock));
+		if (entry->on_done_cb)
+			entry->on_done_cb(entry, entry->on_done_cb_data);
 		entry->done = true;
 		fiber_cond_broadcast(&entry->done_cond);
 		return -1;
@@ -1200,6 +1204,8 @@ wal_async_write(struct journal *journal, struct journal_entry *entry)
 		if (batch == NULL) {
 			diag_set(OutOfMemory, sizeof(struct wal_msg),
 				 "region", "struct wal_msg");
+			if (entry->on_done_cb)
+				entry->on_done_cb(entry, entry->on_done_cb_data);
 			entry->done = true;
 			fiber_cond_broadcast(&entry->done_cond);
 			return -1;
@@ -1253,6 +1259,8 @@ wal_async_write_in_wal_mode_none(struct journal *journal,
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
+	if (entry->on_done_cb)
+		entry->on_done_cb(entry, entry->on_done_cb_data);
 	entry->done = true;
 	fiber_cond_broadcast(&entry->done_cond);
 	return 0;
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 12/14] txn: handle fiber stop event at transaction level
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (10 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-13 14:36   ` Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel Georgy Kirichenko
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 14/14] test: fix flaky test Georgy Kirichenko
  13 siblings, 1 reply; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Get rid of duplicated fiber on stop logic.

Prerequisites: #1254
---
 src/box/memtx_engine.c | 5 -----
 src/box/txn.c          | 5 ++++-
 src/box/vinyl.c        | 4 ----
 3 files changed, 4 insertions(+), 10 deletions(-)

diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 918885318..f371d147f 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -94,15 +94,12 @@ memtx_init_txn(struct txn *txn)
 
 	trigger_create(&txn->fiber_on_yield, txn_on_yield,
 		       NULL, NULL);
-	trigger_create(&txn->fiber_on_stop, txn_on_stop,
-		       NULL, NULL);
 	/*
 	 * Memtx doesn't allow yields between statements of
 	 * a transaction. Set a trigger which would roll
 	 * back the transaction if there is a yield.
 	 */
 	trigger_add(&fiber->on_yield, &txn->fiber_on_yield);
-	trigger_add(&fiber->on_stop, &txn->fiber_on_stop);
 	/*
 	 * This serves as a marker that the triggers are
 	 * initialized.
@@ -386,7 +383,6 @@ memtx_engine_prepare(struct engine *engine, struct txn *txn)
 	 * on calls to trigger_create/trigger_clear.
 	 */
 	trigger_clear(&txn->fiber_on_yield);
-	trigger_clear(&txn->fiber_on_stop);
 	if (txn->is_aborted) {
 		diag_set(ClientError, ER_TRANSACTION_YIELD);
 		diag_log();
@@ -465,7 +461,6 @@ memtx_engine_rollback(struct engine *engine, struct txn *txn)
 {
 	if (txn->engine_tx != NULL) {
 		trigger_clear(&txn->fiber_on_yield);
-		trigger_clear(&txn->fiber_on_stop);
 	}
 	struct txn_stmt *stmt;
 	stailq_reverse(&txn->stmts);
diff --git a/src/box/txn.c b/src/box/txn.c
index 815d635fe..5ceae0da1 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -202,6 +202,8 @@ txn_begin()
 	txn->entry = NULL;
 	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
 	fiber_set_txn(fiber(), txn);
+	trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
+	trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
 	return txn;
 }
 
@@ -462,7 +464,7 @@ txn_write(struct txn *txn)
 {
 	if (txn_prepare(txn) != 0)
 		goto fail;
-
+	trigger_clear(&txn->fiber_on_stop);
 	txn->start_tm = ev_monotonic_now(loop());
 	if (txn->n_new_rows + txn->n_applier_rows == 0) {
 		/* Nothing to do. */
@@ -528,6 +530,7 @@ txn_rollback_stmt(struct txn *txn)
 void
 txn_rollback(struct txn *txn)
 {
+	trigger_clear(&txn->fiber_on_stop);
 	txn->signature = -1;
 	txn_complete(txn);
 	txn_free(txn);
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index c61f9477e..96c1ab1c8 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2414,8 +2414,6 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn)
 	txn->engine_tx = vy_tx_begin(env->xm);
 	if (txn->engine_tx == NULL)
 		return -1;
-	trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
-	trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
 	return 0;
 }
 
@@ -2485,7 +2483,6 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
 	vy_regulator_check_dump_watermark(&env->regulator);
 
 	txn->engine_tx = NULL;
-	trigger_clear(&txn->fiber_on_stop);
 }
 
 static void
@@ -2499,7 +2496,6 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
 	vy_tx_rollback(tx);
 
 	txn->engine_tx = NULL;
-	trigger_clear(&txn->fiber_on_stop);
 }
 
 static int
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (11 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 12/14] txn: handle fiber stop event at transaction level Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  2019-06-13 15:17   ` Vladimir Davydov
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 14/14] test: fix flaky test Georgy Kirichenko
  13 siblings, 1 reply; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier use asynchronous transaction to batch journal writes. A
sequencer orders transaction execution and check write result while
an applier reads network.

Closes: #1254
---
 src/box/applier.cc | 372 ++++++++++++++++++++++++++++++---------------
 src/box/applier.h  |   4 +
 src/box/box.cc     |   1 +
 3 files changed, 251 insertions(+), 126 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 5a92f6109..9f0efda5a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -53,6 +53,231 @@
 
 STRS(applier_state, applier_STATE);
 
+/**
+ * Process a no-op request.
+ *
+ * A no-op request does not affect any space, but it
+ * promotes vclock and is written to WAL.
+ */
+static int
+process_nop(struct request *request)
+{
+	assert(request->type == IPROTO_NOP);
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, NULL) == NULL)
+		return -1;
+	return txn_commit_stmt(txn, request);
+}
+
+static int
+apply_row(struct xrow_header *row)
+{
+	struct request request;
+	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+		return -1;
+	if (request.type == IPROTO_NOP)
+		return process_nop(&request);
+	struct space *space = space_cache_find(request.space_id);
+	if (space == NULL)
+		return -1;
+	if (box_process_rw(&request, space, NULL) != 0) {
+		say_error("error applying row: %s", request_str(&request));
+		return -1;
+	}
+	return 0;
+}
+
+/**
+ * A helper struct to link xrow objects in a list.
+ */
+struct applier_tx_row {
+	/* Next transaction row. */
+	struct stailq_entry next;
+	/* xrow_header struct for the current transaction row. */
+	struct xrow_header row;
+};
+
+struct sequencer {
+	struct stailq txn_queue;
+	struct fiber_cond txn_queue_cond;
+	struct vclock net_vclock;
+	struct fiber_cond tx_vclock_cond;
+	struct diag diag;
+	struct rlist on_fail;
+};
+
+static struct sequencer sequencer;
+
+static int
+sequencer_collect_f(va_list ap)
+{
+	(void) ap;
+	while (!fiber_is_cancelled()) {
+		while (stailq_empty(&sequencer.txn_queue)) {
+			if (!diag_is_empty(&sequencer.diag)) {
+				diag_clear(&sequencer.diag);
+				vclock_copy(&sequencer.net_vclock, &replicaset.vclock);
+			}
+			fiber_cond_wait(&sequencer.txn_queue_cond);
+			continue;
+		}
+		struct txn *txn =
+			stailq_shift_entry(&sequencer.txn_queue, struct txn,
+					   in_txn_cache);
+		if (txn_wait(txn) == 0) {
+			continue;
+		}
+		if (diag_is_empty(&sequencer.diag)) {
+			diag_move(&fiber()->diag, &sequencer.diag);
+			trigger_run(&sequencer.on_fail, NULL);
+		}
+	}
+	return 0;
+}
+
+void
+applier_init()
+{
+	stailq_create(&sequencer.txn_queue);
+	fiber_cond_create(&sequencer.txn_queue_cond);
+
+	rlist_create(&sequencer.on_fail);
+
+	vclock_create(&sequencer.net_vclock);
+	fiber_cond_create(&sequencer.tx_vclock_cond);
+	diag_create(&sequencer.diag);
+	struct fiber *collector = fiber_new("collector", sequencer_collect_f);
+	if (collector == NULL)
+		panic("Failed to create a sequencer collector fiber");
+	fiber_start(collector, NULL);
+}
+
+static inline void
+sequencer_on_fail(struct trigger *on_fail)
+{
+	trigger_add(&sequencer.on_fail, on_fail);
+}
+
+static void
+sequencer_rollback_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	struct txn *txn = (struct txn *)event;
+	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
+	fiber_cond_signal(&sequencer.txn_queue_cond);
+
+	diag_set(ClientError, ER_WAL_IO);
+	diag_move(&fiber()->diag, &sequencer.diag);
+	trigger_run(&sequencer.on_fail, &sequencer);
+}
+
+static void
+sequencer_commit_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	(void) event;
+	struct txn *txn = (struct txn *)event;
+	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
+	fiber_cond_signal(&sequencer.txn_queue_cond);
+	fiber_cond_broadcast(&sequencer.tx_vclock_cond);
+}
+
+static inline int
+sequencer_submit(uint32_t replica_id, int64_t lsn, struct stailq *rows)
+{
+	struct replica *replica = replica_by_id(replica_id);
+	struct latch *latch = (replica ? &replica->order_latch :
+			      &replicaset.applier.order_latch);
+
+	latch_lock(latch);
+	if (vclock_get(&sequencer.net_vclock, replica_id) >= lsn) {
+		/* Nothing to do. */
+		latch_unlock(latch);
+		return 0;
+	}
+
+	struct trigger *on_rollback;
+	struct trigger *on_commit;
+	/**
+	 * Explicitly begin the transaction so that we can
+	 * control fiber->gc life cycle and, in case of apply
+	 * conflict safely access failed xrow object and allocate
+	 * IPROTO_NOP on gc.
+	 */
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		goto fail;
+	struct applier_tx_row *item;
+	stailq_foreach_entry(item, rows, next) {
+		struct xrow_header *row = &item->row;
+		int res = apply_row(row);
+		if (res != 0) {
+			struct error *e = diag_last_error(diag_get());
+			/*
+			 * In case of ER_TUPLE_FOUND error and enabled
+			 * replication_skip_conflict configuration
+			 * option, skip applying the foreign row and
+			 * replace it with NOP in the local write ahead
+			 * log.
+			 */
+			if (e->type == &type_ClientError &&
+			    box_error_code(e) == ER_TUPLE_FOUND &&
+			    replication_skip_conflict) {
+				diag_clear(diag_get());
+				row->type = IPROTO_NOP;
+				row->bodycnt = 0;
+				res = apply_row(row);
+			}
+		}
+		if (res != 0)
+			goto rollback;
+	}
+	/*
+	 * We are going to commit so it's a high time to check if
+	 * the current transaction has non-local effects.
+	 */
+	if (txn_is_distributed(txn)) {
+		/*
+		 * A transaction mixes remote and local rows.
+		 * Local rows must be replicated back, which
+		 * doesn't make sense since the master likely has
+		 * new changes which local rows may overwrite.
+		 * Raise an error.
+		 */
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "Replication", "distributed transactions");
+		goto rollback;
+	}
+
+	/* We are ready to submit txn to wal. */
+	on_rollback = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
+	trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
+	txn_on_rollback(txn, on_rollback);
+
+	on_commit = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
+	trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
+	txn_on_commit(txn, on_commit);
+
+	if (txn_write(txn) != 0)
+		goto fail;
+
+	vclock_follow(&sequencer.net_vclock, replica_id, lsn);
+	latch_unlock(latch);
+	return 0;
+
+rollback:
+	txn_rollback(txn);
+
+fail:
+	latch_unlock(latch);
+	if (diag_is_empty(&sequencer.diag)) {
+		diag_add_error(&sequencer.diag, diag_last_error(&fiber()->diag));
+		trigger_run(&sequencer.on_fail, NULL);
+	}
+	return -1;
+}
+
+
 static inline void
 applier_set_state(struct applier *applier, enum applier_state state)
 {
@@ -194,40 +419,6 @@ rollback:
 	return -1;
 }
 
-/**
- * Process a no-op request.
- *
- * A no-op request does not affect any space, but it
- * promotes vclock and is written to WAL.
- */
-static int
-process_nop(struct request *request)
-{
-	assert(request->type == IPROTO_NOP);
-	struct txn *txn = in_txn();
-	if (txn_begin_stmt(txn, NULL) == NULL)
-		return -1;
-	return txn_commit_stmt(txn, request);
-}
-
-static int
-apply_row(struct xrow_header *row)
-{
-	struct request request;
-	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
-		return -1;
-	if (request.type == IPROTO_NOP)
-		return process_nop(&request);
-	struct space *space = space_cache_find(request.space_id);
-	if (space == NULL)
-		return -1;
-	if (box_process_rw(&request, space, NULL) != 0) {
-		say_error("error applying row: %s", request_str(&request));
-		return -1;
-	}
-	return 0;
-}
-
 /**
  * Connect to a remote host and authenticate the client.
  */
@@ -450,16 +641,6 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
-/**
- * A helper struct to link xrow objects in a list.
- */
-struct applier_tx_row {
-	/* Next transaction row. */
-	struct stailq_entry next;
-	/* xrow_header struct for the current transaction row. */
-	struct xrow_header row;
-};
-
 static struct applier_tx_row *
 applier_read_tx_row(struct applier *applier)
 {
@@ -565,70 +746,14 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 				    next)->row.is_commit);
 }
 
-/**
- * Apply all rows in the rows queue as a single transaction.
- *
- * Return 0 for success or -1 in case of an error.
- */
-static int
-applier_apply_tx(struct stailq *rows)
+static void
+applier_on_fail(struct trigger *trigger, void *event)
 {
-	/**
-	 * Explicitly begin the transaction so that we can
-	 * control fiber->gc life cycle and, in case of apply
-	 * conflict safely access failed xrow object and allocate
-	 * IPROTO_NOP on gc.
-	 */
-	struct txn *txn = txn_begin();
-	struct applier_tx_row *item;
-	if (txn == NULL)
-		diag_raise();
-	stailq_foreach_entry(item, rows, next) {
-		struct xrow_header *row = &item->row;
-		int res = apply_row(row);
-		if (res != 0) {
-			struct error *e = diag_last_error(diag_get());
-			/*
-			 * In case of ER_TUPLE_FOUND error and enabled
-			 * replication_skip_conflict configuration
-			 * option, skip applying the foreign row and
-			 * replace it with NOP in the local write ahead
-			 * log.
-			 */
-			if (e->type == &type_ClientError &&
-			    box_error_code(e) == ER_TUPLE_FOUND &&
-			    replication_skip_conflict) {
-				diag_clear(diag_get());
-				row->type = IPROTO_NOP;
-				row->bodycnt = 0;
-				res = apply_row(row);
-			}
-		}
-		if (res != 0)
-			goto rollback;
-	}
-	/*
-	 * We are going to commit so it's a high time to check if
-	 * the current transaction has non-local effects.
-	 */
-	if (txn_is_distributed(txn)) {
-		/*
-		 * A transaction mixes remote and local rows.
-		 * Local rows must be replicated back, which
-		 * doesn't make sense since the master likely has
-		 * new changes which local rows may overwrite.
-		 * Raise an error.
-		 */
-		diag_set(ClientError, ER_UNSUPPORTED,
-			 "Replication", "distributed transactions");
-		goto rollback;
-	}
-	return txn_commit(txn);
-
-rollback:
-	txn_rollback(txn);
-	fiber_gc();
-	return -1;
+	(void) event;
+	struct applier *applier = (struct applier *)trigger->data;
+	if (!diag_is_empty(&sequencer.diag))
+		diag_add_error(&applier->diag, diag_last_error(&sequencer.diag));
+	fiber_cancel(applier->reader);
 }
 
 /**
@@ -735,6 +860,10 @@ applier_subscribe(struct applier *applier)
 
 	applier->lag = TIMEOUT_INFINITY;
 
+	struct trigger on_fail;
+	trigger_create(&on_fail, applier_on_fail, applier, NULL);
+	sequencer_on_fail(&on_fail);
+
 	/*
 	 * Process a stream of rows from the binary log.
 	 */
@@ -763,31 +892,16 @@ applier_subscribe(struct applier *applier)
 		struct stailq rows;
 		applier_read_tx(applier, &rows);
 
+		fiber_cond_signal(&applier->writer_cond);
 		struct xrow_header *first_row =
 			&stailq_first_entry(&rows, struct applier_tx_row,
 					    next)->row;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(first_row->replica_id);
-		struct latch *latch = (replica ? &replica->order_latch :
-				       &replicaset.applier.order_latch);
-		/*
-		 * In a full mesh topology, the same set of changes
-		 * may arrive via two concurrently running appliers.
-		 * Hence we need a latch to strictly order all changes
-		 * that belong to the same server id.
-		 */
-		latch_lock(latch);
-		if (vclock_get(&replicaset.vclock, first_row->replica_id) <
-		    first_row->lsn &&
-		    applier_apply_tx(&rows) != 0) {
-			latch_unlock(latch);
+		if (sequencer_submit(first_row->replica_id,
+				     first_row->lsn, &rows) != 0) {
+			trigger_clear(&on_fail);
 			diag_raise();
 		}
-		latch_unlock(latch);
-
-		if (applier->state == APPLIER_SYNC ||
-		    applier->state == APPLIER_FOLLOW)
-			fiber_cond_signal(&applier->writer_cond);
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
 		fiber_gc();
@@ -872,6 +986,11 @@ applier_f(va_list ap)
 				return -1;
 			}
 		} catch (FiberIsCancelled *e) {
+			if (!diag_is_empty(&applier->diag)) {
+				diag_move(&applier->diag, &fiber()->diag);
+				applier_disconnect(applier, APPLIER_STOPPED);
+				break;
+			}
 			applier_disconnect(applier, APPLIER_OFF);
 			break;
 		} catch (SocketError *e) {
@@ -960,6 +1079,7 @@ applier_new(const char *uri)
 	rlist_create(&applier->on_state);
 	fiber_cond_create(&applier->resume_cond);
 	fiber_cond_create(&applier->writer_cond);
+	diag_create(&applier->diag);
 
 	return applier;
 }
diff --git a/src/box/applier.h b/src/box/applier.h
index 5bff90031..b0e56add6 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -114,8 +114,12 @@ struct applier {
 	bool is_paused;
 	/** Condition variable signaled to resume the applier. */
 	struct fiber_cond resume_cond;
+	struct diag diag;
 };
 
+void
+applier_init();
+
 /**
  * Start a client to a remote master using a background fiber.
  *
diff --git a/src/box/box.cc b/src/box/box.cc
index 510f3fc99..49f8f24af 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2158,6 +2158,7 @@ box_cfg_xc(void)
 	port_init();
 	iproto_init();
 	sql_init();
+	applier_init();
 
 	int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] [PATCH v3 14/14] test: fix flaky test
  2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
                   ` (12 preceding siblings ...)
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel Georgy Kirichenko
@ 2019-06-09 20:44 ` Georgy Kirichenko
  13 siblings, 0 replies; 42+ messages in thread
From: Georgy Kirichenko @ 2019-06-09 20:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

This test fails sporadically
---
 test/replication/sync.result          |  7 +++++--
 test/replication/sync.test.lua        |  4 ++--
 test/replication/transaction.result   | 16 +++++++++++++++-
 test/replication/transaction.test.lua |  7 ++++++-
 4 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/test/replication/sync.result b/test/replication/sync.result
index eddc7cbc8..6b5a14d3f 100644
--- a/test/replication/sync.result
+++ b/test/replication/sync.result
@@ -46,7 +46,7 @@ function fill()
         box.space.test:replace{i}
     end
     fiber.create(function()
-        box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.1)
+        box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.0025)
         test_run:wait_cond(function()
             local r = box.info.replication[2]
             return r ~= nil and r.downstream ~= nil and
@@ -55,7 +55,6 @@ function fill()
         for i = count + 101, count + 200 do
             box.space.test:replace{i}
         end
-        box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
     end)
     count = count + 200
 end;
@@ -250,6 +249,10 @@ test_run:cmd("switch default")
 ---
 - true
 ...
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
+---
+- ok
+...
 box.error.injection.set('ERRINJ_WAL_DELAY', true)
 ---
 - ok
diff --git a/test/replication/sync.test.lua b/test/replication/sync.test.lua
index 52ce88fe2..f0f530ad4 100644
--- a/test/replication/sync.test.lua
+++ b/test/replication/sync.test.lua
@@ -30,7 +30,7 @@ function fill()
         box.space.test:replace{i}
     end
     fiber.create(function()
-        box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.1)
+        box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.0025)
         test_run:wait_cond(function()
             local r = box.info.replication[2]
             return r ~= nil and r.downstream ~= nil and
@@ -39,7 +39,6 @@ function fill()
         for i = count + 101, count + 200 do
             box.space.test:replace{i}
         end
-        box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
     end)
     count = count + 200
 end;
@@ -136,6 +135,7 @@ box.cfg{replication_sync_lag = 1}
 box.cfg{replication_sync_timeout = 10}
 
 test_run:cmd("switch default")
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0)
 box.error.injection.set('ERRINJ_WAL_DELAY', true)
 test_run:cmd("setopt delimiter ';'")
 _ = fiber.create(function()
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
index 8c2ac6ee4..c54c1e8d5 100644
--- a/test/replication/transaction.result
+++ b/test/replication/transaction.result
@@ -7,12 +7,21 @@ test_run = env.new()
 box.schema.user.grant('guest', 'replication')
 ---
 ...
-s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+engine = test_run:get_cfg('engine')
+---
+...
+s = box.schema.space.create('test', {engine = engine})
 ---
 ...
 _ = s:create_index('pk')
 ---
 ...
+l = box.schema.space.create('l_space', {engine = engine, is_local = true})
+---
+...
+_ = l:create_index('pk')
+---
+...
 -- transaction w/o conflict
 box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
 ---
@@ -92,6 +101,11 @@ box.cfg{replication = replication}
 ---
 ...
 -- replication stopped of third transaction
+-- flush wal
+box.space.l_space:replace({1})
+---
+- [1]
+...
 v1[1] + 2 == box.info.vclock[1]
 ---
 - true
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
index f25a4737d..4e0323128 100644
--- a/test/replication/transaction.test.lua
+++ b/test/replication/transaction.test.lua
@@ -1,9 +1,12 @@
 env = require('test_run')
 test_run = env.new()
 box.schema.user.grant('guest', 'replication')
+engine = test_run:get_cfg('engine')
 
-s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+s = box.schema.space.create('test', {engine = engine})
 _ = s:create_index('pk')
+l = box.schema.space.create('l_space', {engine = engine, is_local = true})
+_ = l:create_index('pk')
 
 -- transaction w/o conflict
 box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
@@ -37,6 +40,8 @@ replication = box.cfg.replication
 box.cfg{replication = {}}
 box.cfg{replication = replication}
 -- replication stopped of third transaction
+-- flush wal
+box.space.l_space:replace({1})
 v1[1] + 2 == box.info.vclock[1]
 box.space.test:select()
 -- check replication status
-- 
2.21.0

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] Re: [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization Georgy Kirichenko
@ 2019-06-09 21:59   ` Konstantin Osipov
  2019-06-11 11:42   ` [tarantool-patches] " Vladimir Davydov
  1 sibling, 0 replies; 42+ messages in thread
From: Konstantin Osipov @ 2019-06-09 21:59 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/06/09 23:47]:
> Fire transaction trigger after a transaction finalization. This allows
> to not to view the transaction dismissed changes in case of rollback.

LGTM. 


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization Georgy Kirichenko
  2019-06-09 21:59   ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-11 11:42   ` Vladimir Davydov
  1 sibling, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-11 11:42 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:30PM +0300, Georgy Kirichenko wrote:
> diff --git a/test/box/ddl.test.lua b/test/box/ddl.test.lua
> index 101bc6f9b..301f7e6c1 100644
> --- a/test/box/ddl.test.lua
> +++ b/test/box/ddl.test.lua
> @@ -270,3 +270,18 @@ box.rollback()
>  
>  _ = c:get()
>  test_latch:drop() -- this is where everything stops
> +
> +-- gh-4276 - check grant privilege rollback
> +_ = box.schema.user.create('testg')
> +_ = box.schema.space.create('testg'):create_index('pk')
> +
> +box.error.injection.set('ERRINJ_WAL_IO', true)

The test must be disabled in the release mode. Moved it to
box/errinj.test.lua and pushed to master.

> +-- the grant operation above fails and test hasn't any space test permissions
> +box.schema.user.grant('testg', 'read,write', 'space', 'testg')
> +-- switch user and check they couldn't select
> +box.session.su('testg')
> +box.space.testg:select()
> +box.session.su('admin')
> +box.error.injection.set('ERRINJ_WAL_IO', false)
> +box.schema.user.drop('testg')
> +box.space.testg:drop()

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 02/14] ddl: synchronize privileges cache with actual data state.
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 02/14] ddl: synchronize privileges cache with actual data state Georgy Kirichenko
@ 2019-06-11 13:13   ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-11 13:13 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:31PM +0300, Georgy Kirichenko wrote:
> We tend to synchronize cached data with the actual data changes: apply
> while on_replace and undo while on_rollback.
> ---
>  src/box/alter.cc | 37 ++++++++++++++++---------------------
>  1 file changed, 16 insertions(+), 21 deletions(-)
> 
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index ed9e55907..c4a1c52a9 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -2938,31 +2938,23 @@ grant_or_revoke(struct priv_def *priv)
>  
>  /** A trigger called on rollback of grant, or on commit of revoke. */
>  static void
> -revoke_priv(struct trigger * /* trigger */, void *event)
> +revoke_priv(struct trigger *trigger, void *event)
>  {
> -	struct txn *txn = (struct txn *) event;
> -	struct txn_stmt *stmt = txn_last_stmt(txn);
> -	struct tuple *tuple = (stmt->new_tuple ?
> -			       stmt->new_tuple : stmt->old_tuple);
> +	(void) event;
> +	struct tuple *tuple = (struct tuple *)trigger->data;
>  	struct priv_def priv;
>  	priv_def_create_from_tuple(&priv, tuple);
> -	/*
> -	 * Access to the object has been removed altogether so
> -	 * there should be no grants at all. If only some grants
> -	 * were removed, modify_priv trigger would have been
> -	 * invoked.
> -	 */
>  	priv.access = 0;
>  	grant_or_revoke(&priv);
>  }
>  
>  /** A trigger called on rollback of grant, or on commit of revoke. */
>  static void
> -modify_priv(struct trigger * /* trigger */, void *event)
> +modify_priv(struct trigger *trigger, void *event)
>  {
> -	struct txn_stmt *stmt = txn_last_stmt((struct txn *) event);
> +	(void) event;
>  	struct priv_def priv;
> -	priv_def_create_from_tuple(&priv, stmt->new_tuple);
> +	priv_def_create_from_tuple(&priv, (struct tuple *)trigger->data);
>  	grant_or_revoke(&priv);
>  }
>  
> @@ -2985,21 +2977,24 @@ on_replace_dd_priv(struct trigger * /* trigger */, void *event)
>  		priv_def_check(&priv, PRIV_GRANT);
>  		grant_or_revoke(&priv);
>  		struct trigger *on_rollback =
> -			txn_alter_trigger_new(revoke_priv, NULL);
> +			txn_alter_trigger_new(revoke_priv, new_tuple);
>  		txn_on_rollback(txn, on_rollback);
>  	} else if (new_tuple == NULL) {                /* revoke */
>  		assert(old_tuple);
>  		priv_def_create_from_tuple(&priv, old_tuple);
>  		priv_def_check(&priv, PRIV_REVOKE);
> -		struct trigger *on_commit =
> -			txn_alter_trigger_new(revoke_priv, NULL);
> -		txn_on_commit(txn, on_commit);
> +		priv.access = 0;
> +		grant_or_revoke(&priv);
> +		struct trigger *on_rollback =
> +			txn_alter_trigger_new(modify_priv, old_tuple);
> +		txn_on_rollback(txn, on_rollback);
>  	} else {                                       /* modify */
>  		priv_def_create_from_tuple(&priv, new_tuple);
>  		priv_def_check(&priv, PRIV_GRANT);
> -		struct trigger *on_commit =
> -			txn_alter_trigger_new(modify_priv, NULL);
> -		txn_on_commit(txn, on_commit);
> +		grant_or_revoke(&priv);
> +		struct trigger *on_rollback =
> +			txn_alter_trigger_new(modify_priv, old_tuple);
> +		txn_on_rollback(txn, on_rollback);
>  	}
>  }

Overall, I like this change as we'll need it anyway to implement DDL
batching (i.e. transaction support for non-yielding DDL statements).
Pushed to master with some cosmetic changes:

diff --git a/src/box/alter.cc b/src/box/alter.cc
index 1b9e2238..3d44362b 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3047,7 +3047,7 @@ grant_or_revoke(struct priv_def *priv)
 	}
 }
 
-/** A trigger called on rollback of grant, or on commit of revoke. */
+/** A trigger called on rollback of grant. */
 static void
 revoke_priv(struct trigger *trigger, void *event)
 {
@@ -3059,13 +3059,14 @@ revoke_priv(struct trigger *trigger, void *event)
 	grant_or_revoke(&priv);
 }
 
-/** A trigger called on rollback of grant, or on commit of revoke. */
+/** A trigger called on rollback of revoke or modify. */
 static void
 modify_priv(struct trigger *trigger, void *event)
 {
 	(void) event;
+	struct tuple *tuple = (struct tuple *)trigger->data;
 	struct priv_def priv;
-	priv_def_create_from_tuple(&priv, (struct tuple *)trigger->data);
+	priv_def_create_from_tuple(&priv, tuple);
 	grant_or_revoke(&priv);
 }

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 04/14] ddl: place alter structures onto a txn memory region
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 04/14] ddl: place alter structures onto a txn memory region Georgy Kirichenko
@ 2019-06-11 14:14   ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-11 14:14 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:33PM +0300, Georgy Kirichenko wrote:
> As alter schema triggers lifecycle is bound with a transaction
> so corresponding structures should be placed onto a txn memory
> region instead of a fiber gc space.
> 
> Prerequisites: #1254
> ---
>  src/box/alter.cc |  8 ++++----
>  src/box/vinyl.c  | 10 ++++------
>  2 files changed, 8 insertions(+), 10 deletions(-)
> 
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index c4a1c52a9..671209b51 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -668,7 +668,7 @@ public:
>  
>  	void *operator new(size_t size)
>  	{
> -		return region_aligned_calloc_xc(&fiber()->gc, size,
> +		return region_aligned_calloc_xc(&in_txn()->region, size,
>  						alignof(uint64_t));
>  	}
>  	void operator delete(void * /* ptr */) {}
> @@ -682,7 +682,7 @@ static struct trigger *
>  txn_alter_trigger_new(trigger_f run, void *data)
>  {
>  	struct trigger *trigger = (struct trigger *)
> -		region_calloc_object_xc(&fiber()->gc, struct trigger);
> +		txn_alloc(in_txn(), sizeof(struct trigger));

I don't like that in some places you use in_txn()->region while in
others txn_alloc(). BTW you lost oom exception here.

I removed txn_alloc() in favor of accessing txn->region directly and
pushed this patch to master.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 05/14] txn: get rid of autocommit from a txn structure
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 05/14] txn: get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-06-13 14:11   ` Vladimir Davydov
  2019-06-16 16:20     ` [tarantool-patches] " Konstantin Osipov
  2019-06-16 16:14   ` Konstantin Osipov
  1 sibling, 1 reply; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-13 14:11 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:34PM +0300, Georgy Kirichenko wrote:
> Move transaction auto start and auto commit behavior to the box level.
> From now a transaction won't start and commit automatically without
> txn_begin/txn_commit invocations. This is a part of a bigger transaction
> refactoring in order to implement detachable transactions and a parallel
> applier.

TBO I don't understand why you need to move autocommit logic out of txn.
The applier code already calls txn_begin/txn_commit explicitly and AFAIU
this is the only place you need to care about. The code looks less
readable and more bulky after this change IMHO. Let's discuss f2f.

See a few comments inline.

> 
> Prerequisites: #1254
> ---
>  src/box/applier.cc     | 35 +++++++++++++---
>  src/box/box.cc         | 94 +++++++++++++++++++++++++++++++++---------
>  src/box/index.cc       | 10 ++---
>  src/box/memtx_engine.c | 10 ++++-
>  src/box/memtx_space.c  |  8 ++--
>  src/box/sql.c          |  2 +-
>  src/box/txn.c          | 46 +++++++--------------
>  src/box/txn.h          | 16 +++----
>  src/box/vinyl.c        | 12 ++----
>  src/box/vy_scheduler.c |  6 +--
>  10 files changed, 148 insertions(+), 91 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 373e1feb9..e3203a4c8 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -172,11 +172,26 @@ applier_writer_f(va_list ap)
>  static int
>  apply_initial_join_row(struct xrow_header *row)
>  {
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		return -1;
>  	struct request request;
>  	xrow_decode_dml(row, &request, dml_request_key_map(row->type));
> -	struct space *space = space_cache_find_xc(request.space_id);
> +	struct space *space = space_cache_find(request.space_id);
> +	if (space == NULL)
> +		goto rollback;
>  	/* no access checks here - applier always works with admin privs */
> -	return space_apply_initial_join_row(space, &request);
> +	if (space_apply_initial_join_row(space, &request))
> +		goto rollback;
> +	int rc;
> +	rc = txn_commit(txn);
> +	if (rc < 0)
> +		return -1;
> +	fiber_gc();

fiber_gc() must be called after txn_commit() while txn_rollback() calls
it by itself. This is a mess. As a matter of fact, I don't understand
why we need to call this function at all now, when we have txn->region
which is used to allocate all txn stuff. AFAIU fiber->gc is only used
for allocating temporary objects, in which case the caller is supposed
to clean up by himself (using region_truncate). That said, I don't think
we need to call fiber_gc() after txn_commit() at all.

There's one exception though - it's vdbe_add_new_autoinc_id, which
stores autogenerated ids on fiber->gc and doesn't delete them until
the corresponding SQL request is complete (this is why all this mess
with fiber_gc() started AFAIR). However, I think we could and should
use a mempool for such allocations.

I think that before pushing this patch, we should clean up fiber_gc()
usage, otherwise it's really easy to loose track where we must and where
we mustn't call fiber_gc() now. Actually, I think we should do that
irrespective of whether we commit this patch or not.

> @@ -403,8 +418,16 @@ applier_join(struct applier *applier)
>  		applier->last_row_time = ev_monotonic_now(loop());
>  		if (iproto_type_is_dml(row.type)) {
>  			vclock_follow_xrow(&replicaset.vclock, &row);
> -			if (apply_row(&row) != 0)
> +			struct txn *txn = txn_begin();
> +			if (txn == NULL)
> +				diag_raise();
> +			if (apply_row(&row) != 0) {
> +				txn_rollback();
> +				diag_raise();
> +			}
> +			if (txn_commit(txn) != 0)
>  				diag_raise();
> +			fiber_gc();

I'd wrap this into a helper function (apply_final_join_row).

> diff --git a/src/box/box.cc b/src/box/box.cc
> index 57419ee01..7f23716e5 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -169,34 +169,62 @@ int
>  box_process_rw(struct request *request, struct space *space,
>  	       struct tuple **result)
>  {
> +	struct tuple *tuple = NULL;
> +	struct txn *txn = in_txn();
> +	bool is_autocommit = txn == NULL;
> +	if (is_autocommit && (txn = txn_begin()) == NULL)
> +		return -1;
>  	assert(iproto_type_is_dml(request->type));
>  	rmean_collect(rmean_box, request->type, 1);
>  	if (access_check_space(space, PRIV_W) != 0)
> -		return -1;
> -	struct txn *txn = txn_begin_stmt(space);
> -	if (txn == NULL)
> -		return -1;
> -	struct tuple *tuple;
> +		goto fail;
> +	if (txn_begin_stmt(txn, space) == NULL)
> +		goto fail;
>  	if (space_execute_dml(space, txn, request, &tuple) != 0) {
> -		txn_rollback_stmt();
> -		return -1;
> +		txn_rollback_stmt(txn);
> +		goto fail;
> +	}
> +	if (result != NULL)
> +		*result = tuple;
> +
> +	if (result == NULL || tuple == NULL) {
> +		if (txn_commit_stmt(txn, request) != 0)
> +			goto fail;
> +		if (is_autocommit) {
> +			if (txn_commit(txn) != 0)
> +				return -1;
> +			fiber_gc();
> +		}
> +		return 0;
>  	}
> -	if (result == NULL)
> -		return txn_commit_stmt(txn, request);
> -	*result = tuple;
> -	if (tuple == NULL)
> -		return txn_commit_stmt(txn, request);
>  	/*
>  	 * Pin the tuple locally before the commit,
>  	 * otherwise it may go away during yield in
>  	 * when WAL is written in autocommit mode.
>  	 */
>  	tuple_ref(tuple);
> -	int rc = txn_commit_stmt(txn, request);
> -	if (rc == 0)
> -		tuple_bless(tuple);
> +
> +	if (txn_commit_stmt(txn, request)) {
> +		/* Unref tuple and rollback if autocommit. */
> +		tuple_unref(tuple);
> +		goto fail;
> +	}
> +	if (is_autocommit) {
> +		if (txn_commit(txn) != 0) {
> +			/* Unref tuple and exit. */
> +			tuple_unref(tuple);
> +			return -1;
> +		}
> +	        fiber_gc();
> +	}
> +	tuple_bless(tuple);

This looks like a lot of copy-n-paste. Please think on how to get rid of
code duplication.

> @@ -299,10 +327,20 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>  	if (request.type != IPROTO_NOP) {
>  		struct space *space = space_cache_find_xc(request.space_id);
> +		struct txn *txn = txn_begin();
> +		if (txn == NULL) {
> +			say_error("error applying row: %s", request_str(&request));
> +			diag_raise();
> +		}

Why? box_process_rw can handle autocommit statements.

>  		if (box_process_rw(&request, space, NULL) != 0) {
>  			say_error("error applying row: %s", request_str(&request));
> +			txn_rollback();
>  			diag_raise();
>  		}
> +		if (txn_commit(txn) != 0) {
> +			diag_raise();
> +		}
> +		fiber_gc();
>  	}
>  	struct wal_stream *xstream =
>  		container_of(stream, struct wal_stream, base);
> @@ -1313,9 +1351,17 @@ box_sequence_reset(uint32_t seq_id)
>  static inline void
>  box_register_replica(uint32_t id, const struct tt_uuid *uuid)
>  {
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		diag_raise();
>  	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",

Again, boxk call box_process_rw, which handles autocommit. No need to
call txn_begin/txn_commit here explicitly.

> @@ -1636,10 +1682,18 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
>  		uu = *replicaset_uuid;
>  	else
>  		tt_uuid_create(&uu);
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		diag_raise();
>  	/* Save replica set UUID in _schema */
>  	if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",

Same here.

> diff --git a/src/box/index.cc b/src/box/index.cc
> index 4a444e5d0..7f26c9bc2 100644
> --- a/src/box/index.cc
> +++ b/src/box/index.cc
> @@ -274,7 +274,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
>  	if (txn_begin_ro_stmt(space, &txn) != 0)
>  		return -1;
>  	if (index_min(index, key, part_count, result) != 0) {
> -		txn_rollback_stmt();
> +		txn_rollback_stmt(txn);

In general I like that txn_rollback_stmt and txn_rollback now take txn.
This makes the API more symmetric. Probably, worth doing in a separate
patch, even if we realize we don't need this one. As a matter of fact,
doing this in a separate patch would make review easier. I'd appreciate
if you could split this patch.

> diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> index f4312484a..149215b87 100644
> --- a/src/box/memtx_engine.c
> +++ b/src/box/memtx_engine.c
> @@ -272,16 +272,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
>  		diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
>  		return -1;
>  	}
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		return -1;
>  	/* no access checks here - applier always works with admin privs */
> -	if (space_apply_initial_join_row(space, &request) != 0)
> +	if (space_apply_initial_join_row(space, &request) != 0) {
> +		txn_rollback();
>  		return -1;
> +	}
> +	int rc = txn_commit(txn);

This is the second place where you surround apply_initial_join_row with
txn_begin/txn_commit. May be, worth doing it in the callback itself?
Note, we only need this in case of memtx - vinyl doesn't create a txn
object for initial recover or join.

> diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
> index 1d209033c..1e630783b 100644
> --- a/src/box/memtx_space.c
> +++ b/src/box/memtx_space.c
> @@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
>  		return -1;
>  	}
>  	request->header->replica_id = 0;
> -	struct txn *txn = txn_begin_stmt(space);
> -	if (txn == NULL)
> +	struct txn *txn = in_txn();
> +	struct txn_stmt *stmt = txn_begin_stmt(txn, space);
> +	if (stmt == NULL)
>  		return -1;
> -	struct txn_stmt *stmt = txn_current_stmt(txn);

AFAICS this is the only place where you use txn_stmt returned by
txn_begin_stmt() - in other places you simply check if it's NULL (which
indicates an error). Why did you decide to make txn_begin_stmt() return
txn_stmt rather than an error code? The API looks lopsided now:

  txn_begin() => txn
  txn_begin_stmt(txn, request) => txn_stmt
  txn_commit_stmt(txn) => int
  txn_commit(txn) => int

Let's please make txn_begin_stmt return int and use txn_current_stmt
here to get txn_stmt, as we used to. This shouldn't affect performance
while the code would look neater this way.

> diff --git a/src/box/txn.c b/src/box/txn.c
> index 1d8271e51..21f34e526 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -441,14 +428,11 @@ fail:
>  }
>  
>  void
> -txn_rollback_stmt()
> +txn_rollback_stmt(struct txn *txn)
>  {
> -	struct txn *txn = in_txn();
>  	if (txn == NULL || txn->in_sub_stmt == 0)
>  		return;

Hmm, how's that even possible now?

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-06-13 14:12   ` Vladimir Davydov
  2019-06-13 19:28     ` Георгий Кириченко
  2019-06-16 16:38   ` [tarantool-patches] " Konstantin Osipov
  1 sibling, 1 reply; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-13 14:12 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:35PM +0300, Georgy Kirichenko wrote:
> Don't touch a fiber gc storage on a transaction rollback explicitly.
> This relaxes dependencies between fiber and transaction life cycles.
> 
> Prerequisites: #1254
> ---
>  src/box/applier.cc     |  8 +++++---
>  src/box/box.cc         | 14 +++++++++-----
>  src/box/call.c         | 22 ++++++++++++++++------
>  src/box/memtx_engine.c |  3 ++-
>  src/box/txn.c          | 35 +++++++++++++++++------------------
>  src/box/txn.h          |  8 ++++++--
>  src/box/vy_scheduler.c | 10 +++++++---
>  7 files changed, 62 insertions(+), 38 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index e3203a4c8..5a92f6109 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -190,7 +190,7 @@ apply_initial_join_row(struct xrow_header *row)
>  	fiber_gc();
>  	return rc;
>  rollback:
> -	txn_rollback();
> +	txn_rollback(txn);
>  	return -1;
>  }
>  
> @@ -422,7 +422,8 @@ applier_join(struct applier *applier)
>  			if (txn == NULL)
>  				diag_raise();
>  			if (apply_row(&row) != 0) {
> -				txn_rollback();
> +				txn_rollback(txn);
> +				fiber_gc();
>  				diag_raise();
>  			}
>  			if (txn_commit(txn) != 0)
> @@ -625,7 +626,8 @@ applier_apply_tx(struct stailq *rows)
>  	return txn_commit(txn);
>  
>  rollback:
> -	txn_rollback();
> +	txn_rollback(txn);
> +	fiber_gc();
>  	return -1;
>  }

As I wrote earlier, I'm pretty convinced we don't need to call
fiber_gc() after txn_commit/rollback at all. Please consider
removing it in a separate patch.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure Georgy Kirichenko
@ 2019-06-13 14:17   ` Vladimir Davydov
  2019-06-13 19:33     ` Георгий Кириченко
  0 siblings, 1 reply; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-13 14:17 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:36PM +0300, Georgy Kirichenko wrote:
> Use a fiber_cond to signal a condition and wake up a waiting fiber.
> This relaxes friction between fiber and transaction life cycles.

I don't see how it relaxes anything ;-)

Please be more specific when writing comments.

> 
> Prerequisites: #1254
> ---
>  src/box/box.cc    |  4 +++-
>  src/box/journal.c |  7 ++++---
>  src/box/journal.h |  9 +++++++--
>  src/box/wal.c     | 29 +++++++++--------------------
>  4 files changed, 23 insertions(+), 26 deletions(-)
> 
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 0ea15a432..5951817d0 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -1172,15 +1166,10 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>  	batch->approx_len += entry->approx_len;
>  	writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
>  	cpipe_flush_input(&writer->wal_pipe);
> -	/**
> -	 * It's not safe to spuriously wakeup this fiber
> -	 * since in that case it will ignore a possible
> -	 * error from WAL writer and not roll back the
> -	 * transaction.
> -	 */
> -	bool cancellable = fiber_set_cancellable(false);
> -	fiber_yield(); /* Request was inserted. */
> -	fiber_set_cancellable(cancellable);
> +
> +	while (!entry->done)
> +		fiber_cond_wait(&entry->done_cond);
> +

Using a fiber_cond to wake up a single fiber is an overkill. You could
as well do

	while (!entry->done)
		fiber_yield_timeout(TIMEOUT_INFINITY);

Anyway, I have my reservations re how you handle WAL writer wakeups.
Please see my comments to the final patch.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 08/14] wal: enable asyncronous wal writes
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 08/14] wal: enable asyncronous wal writes Georgy Kirichenko
@ 2019-06-13 14:21   ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-13 14:21 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:37PM +0300, Georgy Kirichenko wrote:
> Allow to send a journal entry to wal without to wait until the writing
> was finished. Two methods were introduced:
>  * async_write method emits an entry to be written, returns 0 if the
>  entry was successfully scheduled;
>  * async_wait method waits until writing was finished and returns a
>  result of journal write.
> 
> Prerequisites: #1254
> ---
>  src/box/box.cc    | 30 ++++++++++++++++++++---
>  src/box/journal.c | 21 +++++++++++++++-
>  src/box/journal.h | 30 +++++++++++++++++++++++
>  src/box/wal.c     | 62 +++++++++++++++++++++++++++++++++++++++++------
>  4 files changed, 130 insertions(+), 13 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index a88e762c0..d0616095b 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -308,19 +308,41 @@ struct recovery_journal {
>   * min/max LSN of created LSM levels.
>   */
>  static int64_t
> -recovery_journal_write(struct journal *base,
> -		       struct journal_entry *entry)
> +recovery_journal_async_write(struct journal *base,
> +			     struct journal_entry *entry)

And in one of the following patches you rename it back to
recovery_journal_write...

This series needs to be resplit.

>  {
>  	struct recovery_journal *journal = (struct recovery_journal *) base;
> +	entry->res = vclock_sum(journal->vclock);
>  	entry->done = true;
>  	fiber_cond_broadcast(&entry->done_cond);
> -	return vclock_sum(journal->vclock);
> +	return 0;
> +}
> +
> +static int64_t
> +recovery_journal_async_wait(struct journal *base,
> +			    struct journal_entry *entry)
> +{
> +	(void) base;
> +	assert(entry->done);
> +	return entry->res;
> +}
> +
> +static int64_t
> +recovery_journal_write(struct journal *base,
> +		       struct journal_entry *entry)
> +{
> +	if (recovery_journal_async_write(base, entry) == 0)
> +		return recovery_journal_async_wait(base, entry);
> +	return -1;

The 'journal_write' callback looks exactly the same for all
implementations hence we could move this code up to the generic
level.

Anyway, as I said I have concerns re the wakeup design - see my comments
to the final patch.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber Georgy Kirichenko
@ 2019-06-13 14:24   ` Vladimir Davydov
  2019-06-13 19:36     ` Георгий Кириченко
  0 siblings, 1 reply; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-13 14:24 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:38PM +0300, Georgy Kirichenko wrote:
> In order to implement asynchronous transaction we need to run a
> transaction completion handler but tx_prio is not able to yield.
> 
> Prerequisites: #1254
> ---
>  src/box/wal.c | 48 +++++++++++++++++++++++++++++++++++++++++-------
>  1 file changed, 41 insertions(+), 7 deletions(-)
> 
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 86d021896..e868a8e71 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -100,6 +100,12 @@ struct wal_writer
>  	struct cpipe wal_pipe;
>  	/** A memory pool for messages. */
>  	struct mempool msg_pool;
> +	/** A queue to schedule journal entry completions. */
> +	struct stailq schedule_queue;
> +	/** True if writer is in rollback state. */
> +	bool is_in_rollback;
> +	/** A condition to signal about new schedule queue entries. */
> +	struct fiber_cond schedule_cond;
>  	/* ----------------- wal ------------------- */
>  	/** A setting from instance configuration - rows_per_wal */
>  	int64_t wal_max_rows;
> @@ -254,17 +260,36 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
>  	return xlog_tx_commit(l);
>  }
>  
> +/*
> + * Tx schedule fiber function.
> + */
> +static int
> +tx_schedule_f(va_list ap)
> +{
> +	struct wal_writer *writer = va_arg(ap, struct wal_writer *);
> +	while (!fiber_is_cancelled()) {
> +		while (!stailq_empty(&writer->schedule_queue)) {
> +			struct journal_entry *req =
> +				stailq_shift_entry(&writer->schedule_queue,
> +						   struct journal_entry, fifo);
> +			req->done = true;
> +			fiber_cond_broadcast(&req->done_cond);
> +		}
> +		writer->is_in_rollback = false;
> +		fiber_cond_wait(&writer->schedule_cond);
> +	}
> +	return 0;
> +}
> +
>  /**
> - * Signal done condition.
> + * Attach requests to a scheduling queue.
>   */
>  static void
>  tx_schedule_queue(struct stailq *queue)
>  {
> -	struct journal_entry *req;
> -	stailq_foreach_entry(req, queue, fifo) {
> -		req->done = true;
> -		fiber_cond_broadcast(&req->done_cond);
> -	}
> +	struct wal_writer *writer = &wal_writer_singleton;
> +	stailq_concat(&writer->schedule_queue, queue);
> +	fiber_cond_signal(&writer->schedule_cond);

This adds an extra ctxsw to the relatively hot WAL writer wakeup path so
we can't commit it. I assume it's a temporary hack you added so that you
can submit this patches for review.

I understand why you're doing this, but the right way would be removing
yields from on_commit triggers. We need to make vy_log_write yield-free.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine Georgy Kirichenko
@ 2019-06-13 14:27   ` Vladimir Davydov
  2019-06-13 19:38     ` Георгий Кириченко
  0 siblings, 1 reply; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-13 14:27 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:39PM +0300, Georgy Kirichenko wrote:
> Allow unlock a latch from a fiber which isn't owner of the latch. This
> is required to process transaction triggers asynchronously.
> 
> Prerequisites: #1254
> ---
>  src/box/alter.cc     |  2 +-
>  src/lib/core/latch.h | 13 +++++++++++--
>  2 files changed, 12 insertions(+), 3 deletions(-)
> 
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index 671209b51..adf30c9b6 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -3446,7 +3446,7 @@ unlock_after_dd(struct trigger *trigger, void *event)
>  {
>  	(void) trigger;
>  	(void) event;
> -	latch_unlock(&schema_lock);
> +	latch_unlock_external(&schema_lock);
>  }
>  
>  static void
> diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h
> index 49c59cf63..b27d6dd08 100644
> --- a/src/lib/core/latch.h
> +++ b/src/lib/core/latch.h
> @@ -159,9 +159,8 @@ latch_trylock(struct latch *l)
>   * \copydoc box_latch_unlock
>   */
>  static inline void
> -latch_unlock(struct latch *l)
> +latch_unlock_external(struct latch *l)
>  {
> -	assert(l->owner == fiber());
>  	l->owner = NULL;
>  	if (!rlist_empty(&l->queue)) {
>  		struct fiber *f = rlist_first_entry(&l->queue,
> @@ -176,6 +175,16 @@ latch_unlock(struct latch *l)
>  	}
>  }
>  
> +/**
> + * \copydoc box_latch_unlock
> + */
> +static inline void
> +latch_unlock(struct latch *l)
> +{
> +	assert(l->owner == fiber());
> +	latch_unlock_external(l);
> +}

Again, looks like a temporary hack to me. I would rather not commit it.
The right way would be getting rid of the schema_lock. I know that you
have tried, but failed because of vylog. I'll look what we can do about
it.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit Georgy Kirichenko
@ 2019-06-13 14:34   ` Vladimir Davydov
  2019-06-13 19:45     ` Георгий Кириченко
  0 siblings, 1 reply; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-13 14:34 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:40PM +0300, Georgy Kirichenko wrote:
> diff --git a/src/box/txn.c b/src/box/txn.c
> index a08652af1..815d635fe 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -318,22 +319,77 @@ fail:
>  	return -1;
>  }
>  
> +/**
> + * Complete transaction processing.
> + */
> +static void
> +txn_complete(struct txn *txn)
> +{
> +	if (txn->signature < 0) {
> +		if (txn->engine)
> +			engine_rollback(txn->engine, txn);
> +		/* Rollback triggers must not throw. */
> +		fiber_set_txn(fiber(), txn);
> +		if (txn->has_triggers &&
> +		    trigger_run(&txn->on_rollback, txn) != 0) {
> +			diag_log();
> +			unreachable();
> +			panic("rollback trigger failed");
> +		}
> +		fiber_set_txn(fiber(), NULL);
> +
> +		return;
> +	}
> +	/*
> +	 * Engine can be NULL if transaction contains IPROTO_NOP
> +	 * statements only.
> +	 */
> +	if (txn->engine != NULL)
> +		engine_commit(txn->engine, txn);
> +	/*
> +	 * The transaction is in the binary log. No action below
> +	 * may throw. In case an error has happened, there is
> +	 * no other option but terminate.
> +	 */
> +	fiber_set_txn(fiber(), txn);
> +	if (txn->has_triggers &&
> +	    trigger_run(&txn->on_commit, txn) != 0) {
> +		diag_log();
> +		unreachable();
> +		panic("commit trigger failed");
> +	}
> +
> +	fiber_set_txn(fiber(), NULL);

I don't see why we need to have in_txn() in on_commit/rollback triggers.
Could you please point me?

> diff --git a/src/box/wal.c b/src/box/wal.c
> index e868a8e71..eff48b4fe 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -272,6 +272,8 @@ tx_schedule_f(va_list ap)
>  			struct journal_entry *req =
>  				stailq_shift_entry(&writer->schedule_queue,
>  						   struct journal_entry, fifo);
> +			if (req->on_done_cb != NULL)
> +				req->on_done_cb(req, req->on_done_cb_data);
>  			req->done = true;
>  			fiber_cond_broadcast(&req->done_cond);

Why do we need cond if we have a callback? Can't we wake up the awaiting
fiber from the callback?

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 12/14] txn: handle fiber stop event at transaction level
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 12/14] txn: handle fiber stop event at transaction level Georgy Kirichenko
@ 2019-06-13 14:36   ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-13 14:36 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:41PM +0300, Georgy Kirichenko wrote:
> Get rid of duplicated fiber on stop logic.

This patch is good, but apparently I can't apply it. Could you please
make it the first patch in the series?

General advice: arrange trivial patches and patches that don't raise
any questions to be first in patch set.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel Georgy Kirichenko
@ 2019-06-13 15:17   ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-13 15:17 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Jun 09, 2019 at 11:44:42PM +0300, Georgy Kirichenko wrote:
> Applier use asynchronous transaction to batch journal writes. A
> sequencer orders transaction execution and check write result while
> an applier reads network.

In general, I have no objections against the way you handle this.
It does seem that we don't need to start a fiber per each transaction -
batching WAL writes should be enough in most cases, because memtx never
yields while vinyl usually doesn't yield on write (well, there are
UPDATE/INSERT but we can replace them with REPLACE on replica; before/on
replace triggers are slow by design anyway so we don't need to optimize
them).

However, I'd expect to see the reasoning in the commit message.

Also, I find the implementation a little bit too complex and have a
suggestion how we could possibly simplify it - see comments inline.

> 
> Closes: #1254
> ---
>  src/box/applier.cc | 372 ++++++++++++++++++++++++++++++---------------
>  src/box/applier.h  |   4 +
>  src/box/box.cc     |   1 +
>  3 files changed, 251 insertions(+), 126 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 5a92f6109..9f0efda5a 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -53,6 +53,231 @@
>  
>  STRS(applier_state, applier_STATE);
>  
> +/**
> + * Process a no-op request.
> + *
> + * A no-op request does not affect any space, but it
> + * promotes vclock and is written to WAL.
> + */
> +static int
> +process_nop(struct request *request)
> +{
> +	assert(request->type == IPROTO_NOP);
> +	struct txn *txn = in_txn();
> +	if (txn_begin_stmt(txn, NULL) == NULL)
> +		return -1;
> +	return txn_commit_stmt(txn, request);
> +}
> +
> +static int
> +apply_row(struct xrow_header *row)
> +{
> +	struct request request;
> +	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
> +		return -1;
> +	if (request.type == IPROTO_NOP)
> +		return process_nop(&request);
> +	struct space *space = space_cache_find(request.space_id);
> +	if (space == NULL)
> +		return -1;
> +	if (box_process_rw(&request, space, NULL) != 0) {
> +		say_error("error applying row: %s", request_str(&request));
> +		return -1;
> +	}
> +	return 0;
> +}

Please avoid moving and adding/patching code in the same patch.
Split it if necessary. In this particular case I see no reason
for you to move the code.

> +
> +/**
> + * A helper struct to link xrow objects in a list.
> + */
> +struct applier_tx_row {
> +	/* Next transaction row. */
> +	struct stailq_entry next;
> +	/* xrow_header struct for the current transaction row. */
> +	struct xrow_header row;
> +};
> +
> +struct sequencer {
> +	struct stailq txn_queue;
> +	struct fiber_cond txn_queue_cond;
> +	struct vclock net_vclock;
> +	struct fiber_cond tx_vclock_cond;

This cond is never used...

> +	struct diag diag;
> +	struct rlist on_fail;
> +};
> +
> +static struct sequencer sequencer;
> +
> +static int
> +sequencer_collect_f(va_list ap)
> +{
> +	(void) ap;
> +	while (!fiber_is_cancelled()) {
> +		while (stailq_empty(&sequencer.txn_queue)) {
> +			if (!diag_is_empty(&sequencer.diag)) {
> +				diag_clear(&sequencer.diag);
> +				vclock_copy(&sequencer.net_vclock, &replicaset.vclock);
> +			}
> +			fiber_cond_wait(&sequencer.txn_queue_cond);
> +			continue;
> +		}
> +		struct txn *txn =
> +			stailq_shift_entry(&sequencer.txn_queue, struct txn,
> +					   in_txn_cache);
> +		if (txn_wait(txn) == 0) {
> +			continue;
> +		}
> +		if (diag_is_empty(&sequencer.diag)) {
> +			diag_move(&fiber()->diag, &sequencer.diag);
> +			trigger_run(&sequencer.on_fail, NULL);
> +		}
> +	}
> +	return 0;
> +}
> +
> +void
> +applier_init()
> +{
> +	stailq_create(&sequencer.txn_queue);
> +	fiber_cond_create(&sequencer.txn_queue_cond);
> +
> +	rlist_create(&sequencer.on_fail);
> +
> +	vclock_create(&sequencer.net_vclock);
> +	fiber_cond_create(&sequencer.tx_vclock_cond);
> +	diag_create(&sequencer.diag);
> +	struct fiber *collector = fiber_new("collector", sequencer_collect_f);
> +	if (collector == NULL)
> +		panic("Failed to create a sequencer collector fiber");
> +	fiber_start(collector, NULL);
> +}
> +
> +static inline void
> +sequencer_on_fail(struct trigger *on_fail)
> +{
> +	trigger_add(&sequencer.on_fail, on_fail);
> +}
> +
> +static void
> +sequencer_rollback_cb(struct trigger *trigger, void *event)
> +{
> +	(void) trigger;
> +	struct txn *txn = (struct txn *)event;
> +	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
> +	fiber_cond_signal(&sequencer.txn_queue_cond);
> +
> +	diag_set(ClientError, ER_WAL_IO);
> +	diag_move(&fiber()->diag, &sequencer.diag);
> +	trigger_run(&sequencer.on_fail, &sequencer);
> +}
> +
> +static void
> +sequencer_commit_cb(struct trigger *trigger, void *event)
> +{
> +	(void) trigger;
> +	(void) event;
> +	struct txn *txn = (struct txn *)event;
> +	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
> +	fiber_cond_signal(&sequencer.txn_queue_cond);
> +	fiber_cond_broadcast(&sequencer.tx_vclock_cond);
> +}
> +
> +static inline int
> +sequencer_submit(uint32_t replica_id, int64_t lsn, struct stailq *rows)
> +{
> +	struct replica *replica = replica_by_id(replica_id);
> +	struct latch *latch = (replica ? &replica->order_latch :
> +			      &replicaset.applier.order_latch);
> +
> +	latch_lock(latch);
> +	if (vclock_get(&sequencer.net_vclock, replica_id) >= lsn) {
> +		/* Nothing to do. */
> +		latch_unlock(latch);
> +		return 0;
> +	}
> +
> +	struct trigger *on_rollback;
> +	struct trigger *on_commit;
> +	/**
> +	 * Explicitly begin the transaction so that we can
> +	 * control fiber->gc life cycle and, in case of apply
> +	 * conflict safely access failed xrow object and allocate
> +	 * IPROTO_NOP on gc.
> +	 */
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		goto fail;
> +	struct applier_tx_row *item;
> +	stailq_foreach_entry(item, rows, next) {
> +		struct xrow_header *row = &item->row;
> +		int res = apply_row(row);
> +		if (res != 0) {
> +			struct error *e = diag_last_error(diag_get());
> +			/*
> +			 * In case of ER_TUPLE_FOUND error and enabled
> +			 * replication_skip_conflict configuration
> +			 * option, skip applying the foreign row and
> +			 * replace it with NOP in the local write ahead
> +			 * log.
> +			 */
> +			if (e->type == &type_ClientError &&
> +			    box_error_code(e) == ER_TUPLE_FOUND &&
> +			    replication_skip_conflict) {
> +				diag_clear(diag_get());
> +				row->type = IPROTO_NOP;
> +				row->bodycnt = 0;
> +				res = apply_row(row);
> +			}
> +		}
> +		if (res != 0)
> +			goto rollback;
> +	}
> +	/*
> +	 * We are going to commit so it's a high time to check if
> +	 * the current transaction has non-local effects.
> +	 */
> +	if (txn_is_distributed(txn)) {
> +		/*
> +		 * A transaction mixes remote and local rows.
> +		 * Local rows must be replicated back, which
> +		 * doesn't make sense since the master likely has
> +		 * new changes which local rows may overwrite.
> +		 * Raise an error.
> +		 */
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication", "distributed transactions");
> +		goto rollback;
> +	}
> +
> +	/* We are ready to submit txn to wal. */
> +	on_rollback = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
> +	trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
> +	txn_on_rollback(txn, on_rollback);
> +
> +	on_commit = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
> +	trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
> +	txn_on_commit(txn, on_commit);
> +
> +	if (txn_write(txn) != 0)
> +		goto fail;
> +
> +	vclock_follow(&sequencer.net_vclock, replica_id, lsn);
> +	latch_unlock(latch);
> +	return 0;
> +
> +rollback:
> +	txn_rollback(txn);
> +
> +fail:
> +	latch_unlock(latch);
> +	if (diag_is_empty(&sequencer.diag)) {
> +		diag_add_error(&sequencer.diag, diag_last_error(&fiber()->diag));
> +		trigger_run(&sequencer.on_fail, NULL);
> +	}
> +	return -1;
> +}

The code is quite complex and without comments it's easy to get lost.
Please next time write proper comments explaining what all the actors
do.

Just so I understand what's going on here:

 1. applier performs requests comprising a transaction.
 2. applier submits WAL write asynchronously.
 3. wal appends the journal entry to its queue.
 4. applier sets on_commit/rollback trigger to add completed tx to
    sequencer queue.
 5. applier continues to the next transaction (goes to step 1).
 6. wal writes the request and performs the txn_complete callback.
    It also wakes up the fiber waiting for the transaction (hell,
    there's no such fiber in this case!)
 7. txn_complete callback runs on_commit trigger.
 8. on_commit trigger set by the applier adds the transaction to the
    sequencer queue.
 9. sequencer calls txn_wait. Since the transaction is complete,
    it just frees the transaction.

This is a good example of over-engineering IMHO :)

In particular, I find the sequencer entity redundant, because WAL
already acts as a request sequencer so we should reuse its queue
rather than introducing a new one.

Let's try to simplify things a bit:

 - Instead of having journal_async_write/journal_async_wait pair, let's
   introduce single method journal_submit that would take a callback
   to execute upon write completion. The method doesn't need to wake up
   the initiating fiber - it should be done by the callback itself, if
   necessary. No other methods are defined for journal.

 - Then add txn_commit and txn_commit_async that would do something like
   this:

     txn_commit:
       txn->is_async = false
       txn_do_commit()
       while (!txn_is_complete())
           fiber_yield_timeout()
       txn_free()

     txn_commit_async:
       txn->is_async = true
       txn_do_commit()

     txn_do_commit:
       do preparatory engine stuff
       wal_submit(txn_complete)

     txn_complete:
       do completing engine stuff
       run on_commit/rollback triggers
       if (txn->is_async)
           txn_free()
       else
           wakeup awaiting fiber

 - Now applier just sets on_rollback trigger to stop in case of WAL
   write error, then fires txn_commit_async and forgets about the txn -
   it'll get freed by WAL callback automatically upon completion.

No sequencer with a separate queue, no new triggers, no new journal
methods. Overall, should look much simpler. Have you considered this?
Are there any problems? Please check it out.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback
  2019-06-13 14:12   ` Vladimir Davydov
@ 2019-06-13 19:28     ` Георгий Кириченко
  2019-06-14  9:21       ` Vladimir Davydov
  0 siblings, 1 reply; 42+ messages in thread
From: Георгий Кириченко @ 2019-06-13 19:28 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 583 bytes --]

On Thursday, June 13, 2019 5:12:56 PM MSK Vladimir Davydov wrote:
> On Sun, Jun 09, 2019 at 11:44:35PM +0300, Georgy Kirichenko wrote:
> > Don't touch a fiber gc storage on a transaction rollback explicitly.
...
> 
> As I wrote earlier, I'm pretty convinced we don't need to call
> fiber_gc() after txn_commit/rollback at all. Please consider
> removing it in a separate patch.
I call fiber_gc in such cases only in order to preserve the previous behavior. 
I think we could remove fiber_gc when all transaction data would be placed onto 
transaction memory. We could talk about f2f

[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure
  2019-06-13 14:17   ` Vladimir Davydov
@ 2019-06-13 19:33     ` Георгий Кириченко
  2019-06-14  8:05       ` Vladimir Davydov
  0 siblings, 1 reply; 42+ messages in thread
From: Георгий Кириченко @ 2019-06-13 19:33 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 1174 bytes --]

On Thursday, June 13, 2019 5:17:02 PM MSK Vladimir Davydov wrote:
> On Sun, Jun 09, 2019 at 11:44:36PM +0300, Georgy Kirichenko wrote:
> > Use a fiber_cond to signal a condition and wake up a waiting fiber.
> > This relaxes friction between fiber and transaction life cycles.
> 
> I don't see how it relaxes anything ;-)
> 
> Please be more specific when writing comments.
Oh, thanks, I will do it
> 
> > +
> 
> Using a fiber_cond to wake up a single fiber is an overkill. You could
> as well do
> 
> 	while (!entry->done)
> 		fiber_yield_timeout(TIMEOUT_INFINITY);
It is not even possible. In common case (an asynchronous transaction for 
instance) WAL  doesn't know which fiber should be awaken and even does such 
fiber exist. Please consider following case: an applier fired a transaction and 
died. Transaction is finished and could be finalized in any fiber which possible 
doesn't exist in this time. So the one way I see do handle this is to let fiber 
subscribe on transaction done. If you could offer me with better solution I 
would be happy.
> 
> Anyway, I have my reservations re how you handle WAL writer wakeups.
> Please see my comments to the final patch.


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber
  2019-06-13 14:24   ` Vladimir Davydov
@ 2019-06-13 19:36     ` Георгий Кириченко
  2019-06-14  9:20       ` Vladimir Davydov
  0 siblings, 1 reply; 42+ messages in thread
From: Георгий Кириченко @ 2019-06-13 19:36 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 674 bytes --]

On Thursday, June 13, 2019 5:24:52 PM MSK Vladimir Davydov wrote:
> On Sun, Jun 09, 2019 at 11:44:38PM +0300, Georgy Kirichenko wrote:

> 
> This adds an extra ctxsw to the relatively hot WAL writer wakeup path so
> we can't commit it. I assume it's a temporary hack you added so that you
> can submit this patches for review.
This adds at most only one ctxsw on each wal branch and we could ask Alexander 
to bench it.
> 
> I understand why you're doing this, but the right way would be removing
> yields from on_commit triggers. We need to make vy_log_write yield-free.
I'm totally agreed, however, I'm not sure could disable yields for on_commit/
on_rollback right now.


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine
  2019-06-13 14:27   ` Vladimir Davydov
@ 2019-06-13 19:38     ` Георгий Кириченко
  2019-06-14  8:10       ` Vladimir Davydov
  0 siblings, 1 reply; 42+ messages in thread
From: Георгий Кириченко @ 2019-06-13 19:38 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 504 bytes --]

On Thursday, June 13, 2019 5:27:35 PM MSK Vladimir Davydov wrote:
> On Sun, Jun 09, 2019 at 11:44:39PM +0300, Georgy Kirichenko wrote:
> 
> Again, looks like a temporary hack to me. I would rather not commit it.
> The right way would be getting rid of the schema_lock. I know that you
> have tried, but failed because of vylog. I'll look what we can do about
> it.
I failed to remove the latch from checkpoint not from ddl. Removing the latch 
from alter.cc doesn't look possible in the next 2-3 month.


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit
  2019-06-13 14:34   ` Vladimir Davydov
@ 2019-06-13 19:45     ` Георгий Кириченко
  2019-06-14  7:58       ` Vladimir Davydov
  0 siblings, 1 reply; 42+ messages in thread
From: Георгий Кириченко @ 2019-06-13 19:45 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 1513 bytes --]

On Thursday, June 13, 2019 5:34:00 PM MSK Vladimir Davydov wrote:
> On Sun, Jun 09, 2019 at 11:44:40PM +0300, Georgy Kirichenko wrote:
> > diff --git a/src/box/txn.c b/src/box/txn.c

> I don't see why we need to have in_txn() in on_commit/rollback triggers.
> Could you please point me?
Unfortunately lua on_commit triggers use it (because of vshard requirements) 
and I'm don't see the right way to remove it.
> 
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index e868a8e71..eff48b4fe 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -272,6 +272,8 @@ tx_schedule_f(va_list ap)
> > 
> >  			struct journal_entry *req =
> >  			
> >  				stailq_shift_entry(&writer->schedule_queue,
> >  				
> >  						   struct journal_entry, fifo);
> > 
> > +			if (req->on_done_cb != NULL)
> > +				req->on_done_cb(req, req->on_done_cb_data);
> > 
> >  			req->done = true;
> >  			fiber_cond_broadcast(&req->done_cond);
> 
> Why do we need cond if we have a callback? Can't we wake up the awaiting
> fiber from the callback?
The condition and the done variable is required the time when all transaction 
data could be purged. Definitely it could not be done while on_done_cb because 
we will lost transaction status in this case. I think there are some ways to 
handle this but I think the solution would be too complicated.
I use the callback to process txn finalization and call on_commit/on_rollback 
trigger one of them signals an applier fiber to collect transaction and 
determine replication state.


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit
  2019-06-13 19:45     ` Георгий Кириченко
@ 2019-06-14  7:58       ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-14  7:58 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Thu, Jun 13, 2019 at 10:45:18PM +0300, Георгий Кириченко wrote:
> On Thursday, June 13, 2019 5:34:00 PM MSK Vladimir Davydov wrote:
> > On Sun, Jun 09, 2019 at 11:44:40PM +0300, Georgy Kirichenko wrote:
> > > diff --git a/src/box/txn.c b/src/box/txn.c
> 
> > I don't see why we need to have in_txn() in on_commit/rollback triggers.
> > Could you please point me?
> Unfortunately lua on_commit triggers use it (because of vshard requirements) 
> and I'm don't see the right way to remove it.

Okay, I see. Let's add a comment please that this is needed for Lua
triggers. Just curious, how can it possibly use it?

> > 
> > > diff --git a/src/box/wal.c b/src/box/wal.c
> > > index e868a8e71..eff48b4fe 100644
> > > --- a/src/box/wal.c
> > > +++ b/src/box/wal.c
> > > @@ -272,6 +272,8 @@ tx_schedule_f(va_list ap)
> > > 
> > >  			struct journal_entry *req =
> > >  			
> > >  				stailq_shift_entry(&writer->schedule_queue,
> > >  				
> > >  						   struct journal_entry, fifo);
> > > 
> > > +			if (req->on_done_cb != NULL)
> > > +				req->on_done_cb(req, req->on_done_cb_data);
> > > 
> > >  			req->done = true;
> > >  			fiber_cond_broadcast(&req->done_cond);
> > 
> > Why do we need cond if we have a callback? Can't we wake up the awaiting
> > fiber from the callback?
> The condition and the done variable is required the time when all transaction 
> data could be purged.

I think we could purge async transactions right in the callback.

> Definitely it could not be done while on_done_cb because 
> we will lost transaction status in this case. I think there are some ways to 
> handle this but I think the solution would be too complicated.

I don't get what you mean by "loosing transaction status". Could you
elaborate please?

> I use the callback to process txn finalization and call on_commit/on_rollback 
> trigger one of them signals an applier fiber to collect transaction and 
> determine replication state.

Why can't we collect the transaction right from the completion callback?
I think we could propage a failure to applier via on_rollback trigger.
I something wrong with it?

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure
  2019-06-13 19:33     ` Георгий Кириченко
@ 2019-06-14  8:05       ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-14  8:05 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Thu, Jun 13, 2019 at 10:33:37PM +0300, Георгий Кириченко wrote:
> > Using a fiber_cond to wake up a single fiber is an overkill. You could
> > as well do
> > 
> > 	while (!entry->done)
> > 		fiber_yield_timeout(TIMEOUT_INFINITY);
> It is not even possible. In common case (an asynchronous transaction for 
> instance) WAL  doesn't know which fiber should be awaken and even does such 
> fiber exist. Please consider following case: an applier fired a transaction and 
> died. Transaction is finished and could be finalized in any fiber which possible 
> doesn't exist in this time. So the one way I see do handle this is to let fiber 
> subscribe on transaction done. If you could offer me with better solution I 
> would be happy.

It would be nice to see this reasoning in a comment to the code.

However, I don't quite agree. An applier fires async transactions, which
it doesn't really need to wait for, as we can free async transactions
right from the completion callback. To stop the applier on WAL error we
could use on_rollback trigger AFAICS.

Regarding sync transactions, we can make the fiber waiting for it to
complete non-cancellable, just like we do now.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine
  2019-06-13 19:38     ` Георгий Кириченко
@ 2019-06-14  8:10       ` Vladimir Davydov
  2019-06-14  9:18         ` Vladimir Davydov
  0 siblings, 1 reply; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-14  8:10 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Thu, Jun 13, 2019 at 10:38:12PM +0300, Георгий Кириченко wrote:
> On Thursday, June 13, 2019 5:27:35 PM MSK Vladimir Davydov wrote:
> > On Sun, Jun 09, 2019 at 11:44:39PM +0300, Georgy Kirichenko wrote:
> > 
> > Again, looks like a temporary hack to me. I would rather not commit it.
> > The right way would be getting rid of the schema_lock. I know that you
> > have tried, but failed because of vylog. I'll look what we can do about
> > it.
> I failed to remove the latch from checkpoint not from ddl. Removing the latch 
> from alter.cc doesn't look possible in the next 2-3 month.

I'll look what we can do about this - AFAICS there are no problems with
memtx. We just need to tweak vinyl somehow.

Anyway, rather than introducing a new fiber, we can use 'tx' endpoint
instead of 'tx_prio' endpoint, just as we used to. It would be
equivalent, but look more straightforward AFAIU. That would be a kind of
regression though so I would prefer to avoid that if possible and only
use it as a last resort.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine
  2019-06-14  8:10       ` Vladimir Davydov
@ 2019-06-14  9:18         ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-14  9:18 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Fri, Jun 14, 2019 at 11:10:08AM +0300, Vladimir Davydov wrote:
> On Thu, Jun 13, 2019 at 10:38:12PM +0300, Георгий Кириченко wrote:
> > On Thursday, June 13, 2019 5:27:35 PM MSK Vladimir Davydov wrote:
> > > On Sun, Jun 09, 2019 at 11:44:39PM +0300, Georgy Kirichenko wrote:
> > > 
> > > Again, looks like a temporary hack to me. I would rather not commit it.
> > > The right way would be getting rid of the schema_lock. I know that you
> > > have tried, but failed because of vylog. I'll look what we can do about
> > > it.
> > I failed to remove the latch from checkpoint not from ddl. Removing the latch 
> > from alter.cc doesn't look possible in the next 2-3 month.
> 
> I'll look what we can do about this - AFAICS there are no problems with
> memtx. We just need to tweak vinyl somehow.

Discussed f2f. Looks like we can't easily remove the latch, because we
do need it for synching DDL-vs-DDL (we can remove it from checkpointing,
but that's not enough).

Still, latch_unlock_external looks so ugly :(

May be, we could move it to be called before submitting a record to WAL
rather than after? Something like, before_commit trigger, may be?

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber
  2019-06-13 19:36     ` Георгий Кириченко
@ 2019-06-14  9:20       ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-14  9:20 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Thu, Jun 13, 2019 at 10:36:30PM +0300, Георгий Кириченко wrote:
> On Thursday, June 13, 2019 5:24:52 PM MSK Vladimir Davydov wrote:
> > On Sun, Jun 09, 2019 at 11:44:38PM +0300, Georgy Kirichenko wrote:
> 
> > 
> > This adds an extra ctxsw to the relatively hot WAL writer wakeup path so
> > we can't commit it. I assume it's a temporary hack you added so that you
> > can submit this patches for review.
> This adds at most only one ctxsw on each wal branch and we could ask Alexander 
> to bench it.
> > 
> > I understand why you're doing this, but the right way would be removing
> > yields from on_commit triggers. We need to make vy_log_write yield-free.
> I'm totally agreed, however, I'm not sure could disable yields for on_commit/
> on_rollback right now.

I'll try to do that without introducing a separate vylog thread then so
as not to trigger Kostja. If I fail, we will have no choice but fall
back on a separate fiber for WAL wakeups.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* Re: [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback
  2019-06-13 19:28     ` Георгий Кириченко
@ 2019-06-14  9:21       ` Vladimir Davydov
  0 siblings, 0 replies; 42+ messages in thread
From: Vladimir Davydov @ 2019-06-14  9:21 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Thu, Jun 13, 2019 at 10:28:24PM +0300, Георгий Кириченко wrote:
> On Thursday, June 13, 2019 5:12:56 PM MSK Vladimir Davydov wrote:
> > On Sun, Jun 09, 2019 at 11:44:35PM +0300, Georgy Kirichenko wrote:
> > > Don't touch a fiber gc storage on a transaction rollback explicitly.
> ...
> > 
> > As I wrote earlier, I'm pretty convinced we don't need to call
> > fiber_gc() after txn_commit/rollback at all. Please consider
> > removing it in a separate patch.
> I call fiber_gc in such cases only in order to preserve the previous behavior. 
> I think we could remove fiber_gc when all transaction data would be placed onto 
> transaction memory. We could talk about f2f

Discussed f2f. Agreed to try to remove fiber_gc() calls after
txn_commit/rollback() first.

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] Re: [PATCH v3 05/14] txn: get rid of autocommit from a txn structure
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 05/14] txn: get rid of autocommit from a txn structure Georgy Kirichenko
  2019-06-13 14:11   ` Vladimir Davydov
@ 2019-06-16 16:14   ` Konstantin Osipov
  1 sibling, 0 replies; 42+ messages in thread
From: Konstantin Osipov @ 2019-06-16 16:14 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/06/09 23:47]:
> Move transaction auto start and auto commit behavior to the box level.
> >From now a transaction won't start and commit automatically without
> txn_begin/txn_commit invocations. This is a part of a bigger transaction
> refactoring in order to implement detachable transactions and a parallel
> applier.

The patch is now LGTM. I agree with most of Vladimir's comments
but I think they can be done in a separate patch.

I will reply to his review.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] Re: [PATCH v3 05/14] txn: get rid of autocommit from a txn structure
  2019-06-13 14:11   ` Vladimir Davydov
@ 2019-06-16 16:20     ` Konstantin Osipov
  0 siblings, 0 replies; 42+ messages in thread
From: Konstantin Osipov @ 2019-06-16 16:20 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Vladimir Davydov <vdavydov.dev@gmail.com> [19/06/13 17:18]:
> On Sun, Jun 09, 2019 at 11:44:34PM +0300, Georgy Kirichenko wrote:
> > Move transaction auto start and auto commit behavior to the box level.
> > From now a transaction won't start and commit automatically without
> > txn_begin/txn_commit invocations. This is a part of a bigger transaction
> > refactoring in order to implement detachable transactions and a parallel
> > applier.
> 
> TBO I don't understand why you need to move autocommit logic out of txn.
> The applier code already calls txn_begin/txn_commit explicitly and AFAIU
> this is the only place you need to care about. The code looks less
> readable and more bulky after this change IMHO. Let's discuss f2f.

We duplicate autocommit logic on box and SQL level, this is
confusing. This is why we decided to get rid of it altogether in
box. I think when
the refactoring is finished the box code won't look that ugly,
especially after your suggestions.

> fiber_gc() must be called after txn_commit() while txn_rollback() calls
> it by itself. This is a mess.

It is temporary, the patch tries to do equivalent transformations
of the source code, the issue is addressed by a subsequent patch.

> why we need to call this function at all now, when we have txn->region
> which is used to allocate all txn stuff. AFAIU fiber->gc is only used
> for allocating temporary objects, in which case the caller is supposed
> to clean up by himself (using region_truncate). That said, I don't think
> we need to call fiber_gc() after txn_commit() at all.

Indeed, when most of transaction memory is allocated on txn gc,
txn_commit() will free that memory automatically.

> There's one exception though - it's vdbe_add_new_autoinc_id, which
> stores autogenerated ids on fiber->gc and doesn't delete them until
> the corresponding SQL request is complete (this is why all this mess
> with fiber_gc() started AFAIR). However, I think we could and should
> use a mempool for such allocations.
> 
> I think that before pushing this patch, we should clean up fiber_gc()
> usage, otherwise it's really easy to loose track where we must and where
> we mustn't call fiber_gc() now. Actually, I think we should do that
> irrespective of whether we commit this patch or not.

No, I disagree, that would be unnecessary work, we just need to
finish the patch set and push it.

> > @@ -403,8 +418,16 @@ applier_join(struct applier *applier)
> >  		applier->last_row_time = ev_monotonic_now(loop());
> >  		if (iproto_type_is_dml(row.type)) {
> >  			vclock_follow_xrow(&replicaset.vclock, &row);
> > -			if (apply_row(&row) != 0)
> > +			struct txn *txn = txn_begin();
> > +			if (txn == NULL)
> > +				diag_raise();
> > +			if (apply_row(&row) != 0) {
> > +				txn_rollback();
> > +				diag_raise();
> > +			}
> > +			if (txn_commit(txn) != 0)
> >  				diag_raise();
> > +			fiber_gc();
> 
> I'd wrap this into a helper function (apply_final_join_row).

Ok, why not.

> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index 57419ee01..7f23716e5 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc
> > @@ -169,34 +169,62 @@ int
> >  box_process_rw(struct request *request, struct space *space,
> >  	       struct tuple **result)
> >  {
> > +	struct tuple *tuple = NULL;
> > +	struct txn *txn = in_txn();
> > +	bool is_autocommit = txn == NULL;
> > +	if (is_autocommit && (txn = txn_begin()) == NULL)
> > +		return -1;
> >  	assert(iproto_type_is_dml(request->type));
> >  	rmean_collect(rmean_box, request->type, 1);
> >  	if (access_check_space(space, PRIV_W) != 0)
> > -		return -1;
> > -	struct txn *txn = txn_begin_stmt(space);
> > -	if (txn == NULL)
> > -		return -1;
> > -	struct tuple *tuple;
> > +		goto fail;
> > +	if (txn_begin_stmt(txn, space) == NULL)
> > +		goto fail;
> >  	if (space_execute_dml(space, txn, request, &tuple) != 0) {
> > -		txn_rollback_stmt();
> > -		return -1;
> > +		txn_rollback_stmt(txn);
> > +		goto fail;
> > +	}
> > +	if (result != NULL)
> > +		*result = tuple;
> > +
> > +	if (result == NULL || tuple == NULL) {
> > +		if (txn_commit_stmt(txn, request) != 0)
> > +			goto fail;
> > +		if (is_autocommit) {
> > +			if (txn_commit(txn) != 0)
> > +				return -1;
> > +			fiber_gc();
> > +		}
> > +		return 0;
> >  	}
> > -	if (result == NULL)
> > -		return txn_commit_stmt(txn, request);
> > -	*result = tuple;
> > -	if (tuple == NULL)
> > -		return txn_commit_stmt(txn, request);
> >  	/*
> >  	 * Pin the tuple locally before the commit,
> >  	 * otherwise it may go away during yield in
> >  	 * when WAL is written in autocommit mode.
> >  	 */
> >  	tuple_ref(tuple);
> > -	int rc = txn_commit_stmt(txn, request);
> > -	if (rc == 0)
> > -		tuple_bless(tuple);
> > +
> > +	if (txn_commit_stmt(txn, request)) {
> > +		/* Unref tuple and rollback if autocommit. */
> > +		tuple_unref(tuple);
> > +		goto fail;
> > +	}
> > +	if (is_autocommit) {
> > +		if (txn_commit(txn) != 0) {
> > +			/* Unref tuple and exit. */
> > +			tuple_unref(tuple);
> > +			return -1;
> > +		}
> > +	        fiber_gc();
> > +	}
> > +	tuple_bless(tuple);
> 
> This looks like a lot of copy-n-paste. Please think on how to get rid of
> code duplication.

This is already after my changes are applied, I think the place
needs to be revisited when txn alloc is in, not in the middle of a
patch stack.


> > @@ -299,10 +327,20 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
> >  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> >  	if (request.type != IPROTO_NOP) {
> >  		struct space *space = space_cache_find_xc(request.space_id);
> > +		struct txn *txn = txn_begin();
> > +		if (txn == NULL) {
> > +			say_error("error applying row: %s", request_str(&request));
> > +			diag_raise();
> > +		}
> 
> Why? box_process_rw can handle autocommit statements.

Uhm, good point, Let's try to not add transaction management here. 

> 
> >  		if (box_process_rw(&request, space, NULL) != 0) {
> >  			say_error("error applying row: %s", request_str(&request));
> > +			txn_rollback();
> >  			diag_raise();
> >  		}
> > +		if (txn_commit(txn) != 0) {
> > +			diag_raise();
> > +		}
> > +		fiber_gc();
> >  	}
> >  	struct wal_stream *xstream =
> >  		container_of(stream, struct wal_stream, base);
> > @@ -1313,9 +1351,17 @@ box_sequence_reset(uint32_t seq_id)
> >  static inline void
> >  box_register_replica(uint32_t id, const struct tt_uuid *uuid)
> >  {
> > +	struct txn *txn = txn_begin();
> > +	if (txn == NULL)
> > +		diag_raise();
> >  	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
> 
> Again, boxk call box_process_rw, which handles autocommit. No need to
> call txn_begin/txn_commit here explicitly.

Agree.

> > @@ -1636,10 +1682,18 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
> >  		uu = *replicaset_uuid;
> >  	else
> >  		tt_uuid_create(&uu);
> > +	struct txn *txn = txn_begin();
> > +	if (txn == NULL)
> > +		diag_raise();
> >  	/* Save replica set UUID in _schema */
> >  	if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
> 
> Same here.

Agree.

> > diff --git a/src/box/index.cc b/src/box/index.cc
> > index 4a444e5d0..7f26c9bc2 100644
> > --- a/src/box/index.cc
> > +++ b/src/box/index.cc
> > @@ -274,7 +274,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
> >  	if (txn_begin_ro_stmt(space, &txn) != 0)
> >  		return -1;
> >  	if (index_min(index, key, part_count, result) != 0) {
> > -		txn_rollback_stmt();
> > +		txn_rollback_stmt(txn);
> 
> In general I like that txn_rollback_stmt and txn_rollback now take txn.
> This makes the API more symmetric. Probably, worth doing in a separate
> patch, even if we realize we don't need this one. As a matter of fact,
> doing this in a separate patch would make review easier. I'd appreciate
> if you could split this patch.

Agree.

> > diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> > index f4312484a..149215b87 100644
> > --- a/src/box/memtx_engine.c
> > +++ b/src/box/memtx_engine.c
> > @@ -272,16 +272,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
> >  		diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
> >  		return -1;
> >  	}
> > +	struct txn *txn = txn_begin();
> > +	if (txn == NULL)
> > +		return -1;
> >  	/* no access checks here - applier always works with admin privs */
> > -	if (space_apply_initial_join_row(space, &request) != 0)
> > +	if (space_apply_initial_join_row(space, &request) != 0) {
> > +		txn_rollback();
> >  		return -1;
> > +	}
> > +	int rc = txn_commit(txn);
> 
> This is the second place where you surround apply_initial_join_row with
> txn_begin/txn_commit. May be, worth doing it in the callback itself?
> Note, we only need this in case of memtx - vinyl doesn't create a txn
> object for initial recover or join.

Agree.

> 
> > diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
> > index 1d209033c..1e630783b 100644
> > --- a/src/box/memtx_space.c
> > +++ b/src/box/memtx_space.c
> > @@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
> >  		return -1;
> >  	}
> >  	request->header->replica_id = 0;
> > -	struct txn *txn = txn_begin_stmt(space);
> > -	if (txn == NULL)
> > +	struct txn *txn = in_txn();
> > +	struct txn_stmt *stmt = txn_begin_stmt(txn, space);
> > +	if (stmt == NULL)
> >  		return -1;
> > -	struct txn_stmt *stmt = txn_current_stmt(txn);
> 
> AFAICS this is the only place where you use txn_stmt returned by
> txn_begin_stmt() - in other places you simply check if it's NULL (which
> indicates an error). Why did you decide to make txn_begin_stmt() return
> txn_stmt rather than an error code? The API looks lopsided now:
> 
>   txn_begin() => txn
>   txn_begin_stmt(txn, request) => txn_stmt
>   txn_commit_stmt(txn) => int
>   txn_commit(txn) => int
> 
> Let's please make txn_begin_stmt return int and use txn_current_stmt
> here to get txn_stmt, as we used to. This shouldn't affect performance
> while the code would look neater this way.

Agree.

> 
> > diff --git a/src/box/txn.c b/src/box/txn.c
> > index 1d8271e51..21f34e526 100644
> > --- a/src/box/txn.c
> > +++ b/src/box/txn.c
> > @@ -441,14 +428,11 @@ fail:
> >  }
> >  
> >  void
> > -txn_rollback_stmt()
> > +txn_rollback_stmt(struct txn *txn)
> >  {
> > -	struct txn *txn = in_txn();
> >  	if (txn == NULL || txn->in_sub_stmt == 0)
> >  		return;
> 
> Hmm, how's that even possible now?

Nothing changed here, why would this become impossible?

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 42+ messages in thread

* [tarantool-patches] Re: [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback
  2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
  2019-06-13 14:12   ` Vladimir Davydov
@ 2019-06-16 16:38   ` Konstantin Osipov
  1 sibling, 0 replies; 42+ messages in thread
From: Konstantin Osipov @ 2019-06-16 16:38 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/06/09 23:47]:
> Don't touch a fiber gc storage on a transaction rollback explicitly.
> This relaxes dependencies between fiber and transaction life cycles.
> 

Generally, it's quite difficult to track what review comments
you addressed since you didn't reply to the review email :(

There is also no link to the branch :(


> Prerequisites: #1254
> ---
>  src/box/applier.cc     |  8 +++++---
>  src/box/box.cc         | 14 +++++++++-----
>  src/box/call.c         | 22 ++++++++++++++++------
>  src/box/memtx_engine.c |  3 ++-
>  src/box/txn.c          | 35 +++++++++++++++++------------------
>  src/box/txn.h          |  8 ++++++--
>  src/box/vy_scheduler.c | 10 +++++++---
>  7 files changed, 62 insertions(+), 38 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index e3203a4c8..5a92f6109 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -190,7 +190,7 @@ apply_initial_join_row(struct xrow_header *row)
>  	fiber_gc();
>  	return rc;
>  rollback:
> -	txn_rollback();
> +	txn_rollback(txn);
>  	return -1;
>  }

shouldn't there be a fiber_gc() here?

> @@ -334,7 +336,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  		}
>  		if (box_process_rw(&request, space, NULL) != 0) {
>  			say_error("error applying row: %s", request_str(&request));
> -			txn_rollback();
> +			txn_rollback(txn);

and here?

> @@ -395,7 +401,7 @@ txn_commit(struct txn *txn)
>  	if (txn->n_new_rows + txn->n_applier_rows > 0) {
>  		txn->signature = txn_write_to_wal(txn);
>  		if (txn->signature < 0)
> -			goto fail;
> +			return -1;
>  	}

I still don't understand this change.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 42+ messages in thread

end of thread, other threads:[~2019-06-17  5:16 UTC | newest]

Thread overview: 42+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization Georgy Kirichenko
2019-06-09 21:59   ` [tarantool-patches] " Konstantin Osipov
2019-06-11 11:42   ` [tarantool-patches] " Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 02/14] ddl: synchronize privileges cache with actual data state Georgy Kirichenko
2019-06-11 13:13   ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 03/14] txn: transaction memory allocation Georgy Kirichenko
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 04/14] ddl: place alter structures onto a txn memory region Georgy Kirichenko
2019-06-11 14:14   ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 05/14] txn: get rid of autocommit from a txn structure Georgy Kirichenko
2019-06-13 14:11   ` Vladimir Davydov
2019-06-16 16:20     ` [tarantool-patches] " Konstantin Osipov
2019-06-16 16:14   ` Konstantin Osipov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
2019-06-13 14:12   ` Vladimir Davydov
2019-06-13 19:28     ` Георгий Кириченко
2019-06-14  9:21       ` Vladimir Davydov
2019-06-16 16:38   ` [tarantool-patches] " Konstantin Osipov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure Georgy Kirichenko
2019-06-13 14:17   ` Vladimir Davydov
2019-06-13 19:33     ` Георгий Кириченко
2019-06-14  8:05       ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 08/14] wal: enable asyncronous wal writes Georgy Kirichenko
2019-06-13 14:21   ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber Georgy Kirichenko
2019-06-13 14:24   ` Vladimir Davydov
2019-06-13 19:36     ` Георгий Кириченко
2019-06-14  9:20       ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine Georgy Kirichenko
2019-06-13 14:27   ` Vladimir Davydov
2019-06-13 19:38     ` Георгий Кириченко
2019-06-14  8:10       ` Vladimir Davydov
2019-06-14  9:18         ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit Georgy Kirichenko
2019-06-13 14:34   ` Vladimir Davydov
2019-06-13 19:45     ` Георгий Кириченко
2019-06-14  7:58       ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 12/14] txn: handle fiber stop event at transaction level Georgy Kirichenko
2019-06-13 14:36   ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel Georgy Kirichenko
2019-06-13 15:17   ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 14/14] test: fix flaky test Georgy Kirichenko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox