Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH 00/10] Transaction refactoring
@ 2019-04-19 12:43 Georgy Kirichenko
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 01/10] Introduce a txn memory region Georgy Kirichenko
                   ` (9 more replies)
  0 siblings, 10 replies; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:43 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

This patch set is focused on tow major option:
 1. make a transaction and a fiber independent from one another
 2. make a transaction commit asynchronous.

The first one detached transaction from fiber and requires for a
separate transaction memory region for each transaction to store
the transaction itself, their journal entry and rows data.

The second one allows to have more than one transaction per fiber in
fly and it is a prerequisite for a parallel applier.

Issue: https://github.com/tarantool/tarantool/issues/1254
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1254-txn-refactoring

Georgy Kirichenko (10):
  Introduce a txn memory region
  Alloc journal entry on a txn memory region
  Encode a dml statement to a transaction memory region
  Get rid of autocommit from a txn structure
  Get rid of fiber_gc from txn_rollback
  Require for txn in case of txn_begin_stmt/txn_rollback_stmt
  Remove fiber from a journal_entry structure
  Use mempool to alloc wal messages
  Enable asyncronous wal writes
  Introduce asynchronous txn commit

 src/box/applier.cc           |  36 ++++-
 src/box/box.cc               | 113 +++++++++++---
 src/box/call.c               |  16 +-
 src/box/errcode.h            |   2 +-
 src/box/index.cc             |  10 +-
 src/box/journal.c            |  27 +++-
 src/box/journal.h            |  66 ++++++++-
 src/box/memtx_engine.c       |  11 +-
 src/box/memtx_space.c        |   8 +-
 src/box/request.c            |   2 +-
 src/box/sql.c                |   2 +-
 src/box/sql/vdbe.c           |   2 +-
 src/box/txn.c                | 275 +++++++++++++++++++++++------------
 src/box/txn.h                |  35 +++--
 src/box/vinyl.c              |  12 +-
 src/box/vy_log.c             |   4 +-
 src/box/vy_scheduler.c       |  12 +-
 src/box/vy_stmt.c            |   6 +-
 src/box/wal.c                |  95 +++++++++---
 src/box/xrow.c               |  21 ++-
 src/box/xrow.h               |  12 +-
 test/box/misc.result         |   2 +-
 test/engine/savepoint.result |   2 +-
 test/sql/savepoints.result   |   6 +-
 24 files changed, 566 insertions(+), 211 deletions(-)

-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 01/10] Introduce a txn memory region
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
@ 2019-04-19 12:43 ` Georgy Kirichenko
  2019-04-24 18:20   ` [tarantool-patches] " Konstantin Osipov
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 02/10] Alloc journal entry on " Georgy Kirichenko
                   ` (8 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:43 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Attach a separate memory region for each txn structure in order to store
all txn internal data until the transaction finished. This patch is a
preparation to detach a txn from a fiber and a fiber gc storage.

Prerequisites: #1254
---
 src/box/errcode.h            |  2 +-
 src/box/sql/vdbe.c           |  2 +-
 src/box/txn.c                | 71 +++++++++++++++++++++++++++++-------
 src/box/txn.h                |  2 +
 test/box/misc.result         |  2 +-
 test/engine/savepoint.result |  2 +-
 test/sql/savepoints.result   |  6 +--
 7 files changed, 66 insertions(+), 21 deletions(-)

diff --git a/src/box/errcode.h b/src/box/errcode.h
index 3f8cb8e0e..a273826b3 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -166,7 +166,7 @@ struct errcode_record {
 	/*111 */_(ER_WRONG_SPACE_OPTIONS,	"Wrong space options (field %u): %s") \
 	/*112 */_(ER_UNSUPPORTED_INDEX_FEATURE,	"Index '%s' (%s) of space '%s' (%s) does not support %s") \
 	/*113 */_(ER_VIEW_IS_RO,		"View '%s' is read-only") \
-	/*114 */_(ER_SAVEPOINT_NO_TRANSACTION,	"Can not set a savepoint in absence of active transaction") \
+	/*114 */_(ER_NO_TRANSACTION,		"No active transaction") \
 	/*115 */_(ER_SYSTEM,			"%s") \
 	/*116 */_(ER_LOADING,			"Instance bootstrap hasn't finished yet") \
 	/*117 */_(ER_CONNECTION_TO_SELF,	"Connection to self") \
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index ed7bf8870..9fc70980c 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -3017,7 +3017,7 @@ case OP_Savepoint: {
 
 	if (psql_txn == NULL) {
 		assert(!box_txn());
-		diag_set(ClientError, ER_SAVEPOINT_NO_TRANSACTION);
+		diag_set(ClientError, ER_NO_TRANSACTION);
 		rc = SQL_TARANTOOL_ERROR;
 		goto abort_due_to_error;
 	}
diff --git a/src/box/txn.c b/src/box/txn.c
index d51bfc67a..815199645 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -37,6 +37,9 @@
 
 double too_long_threshold;
 
+/* Txn cache. */
+static struct stailq txn_cache = {NULL, &txn_cache.first};
+
 static inline void
 fiber_set_txn(struct fiber *fiber, struct txn *txn)
 {
@@ -44,7 +47,7 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
 }
 
 static int
-txn_add_redo(struct txn_stmt *stmt, struct request *request)
+txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
 {
 	stmt->row = request->header;
 	if (request->header != NULL)
@@ -52,7 +55,7 @@ txn_add_redo(struct txn_stmt *stmt, struct request *request)
 
 	/* Create a redo log row for Lua requests */
 	struct xrow_header *row;
-	row = region_alloc_object(&fiber()->gc, struct xrow_header);
+	row = region_alloc_object(&txn->region, struct xrow_header);
 	if (row == NULL) {
 		diag_set(OutOfMemory, sizeof(*row),
 			 "region", "struct xrow_header");
@@ -77,7 +80,7 @@ static struct txn_stmt *
 txn_stmt_new(struct txn *txn)
 {
 	struct txn_stmt *stmt;
-	stmt = region_alloc_object(&fiber()->gc, struct txn_stmt);
+	stmt = region_alloc_object(&txn->region, struct txn_stmt);
 	if (stmt == NULL) {
 		diag_set(OutOfMemory, sizeof(*stmt),
 			 "region", "struct txn_stmt");
@@ -134,16 +137,50 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
 	}
 }
 
+/*
+ * Return a txn from cache or create a new one if cache is empty.
+ */
+inline static struct txn *
+txn_new()
+{
+	if (!stailq_empty(&txn_cache))
+		return stailq_shift_entry(&txn_cache, struct txn, list);
+
+	/* Create a region. */
+	struct region region;
+	region_create(&region, &cord()->slabc);
+
+	/* Place txn structure on the region. */
+	struct txn *txn = region_alloc_object(&region, struct txn);
+	if (txn == NULL) {
+		diag_set(OutOfMemory, sizeof(*txn), "region", "struct txn");
+		return NULL;
+	}
+	assert(region_used(&region) == sizeof(*txn));
+	txn->region = region;
+	return txn;
+}
+
+/*
+ * Free txn memory and return it to a cache.
+ */
+inline static void
+txn_free(struct txn *txn)
+{
+	/* Truncate region up to struct txn size. */
+	region_truncate(&txn->region, sizeof(struct txn));
+	stailq_add(&txn_cache, &txn->list);
+}
+
 struct txn *
 txn_begin(bool is_autocommit)
 {
 	static int64_t tsn = 0;
 	assert(! in_txn());
-	struct txn *txn = region_alloc_object(&fiber()->gc, struct txn);
-	if (txn == NULL) {
-		diag_set(OutOfMemory, sizeof(*txn), "region", "struct txn");
+	struct txn *txn = txn_new();
+	if (txn == NULL)
 		return NULL;
-	}
+
 	/* Initialize members explicitly to save time on memset() */
 	stailq_create(&txn->stmts);
 	txn->n_new_rows = 0;
@@ -251,7 +288,7 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	 * stmt->space can be NULL for IRPOTO_NOP.
 	 */
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
-		if (txn_add_redo(stmt, request) != 0)
+		if (txn_add_redo(txn, stmt, request) != 0)
 			goto fail;
 		assert(stmt->row != NULL);
 		if (stmt->row->replica_id == 0) {
@@ -393,8 +430,8 @@ txn_commit(struct txn *txn)
 	stailq_foreach_entry(stmt, &txn->stmts, next)
 		txn_stmt_unref_tuples(stmt);
 
-	TRASH(txn);
 	fiber_set_txn(fiber(), NULL);
+	txn_free(txn);
 	return 0;
 fail:
 	txn_rollback();
@@ -433,10 +470,10 @@ txn_rollback()
 	stailq_foreach_entry(stmt, &txn->stmts, next)
 		txn_stmt_unref_tuples(stmt);
 
-	TRASH(txn);
 	/** Free volatile txn memory. */
 	fiber_gc();
 	fiber_set_txn(fiber(), NULL);
+	txn_free(txn);
 }
 
 void
@@ -521,12 +558,18 @@ box_txn_rollback()
 void *
 box_txn_alloc(size_t size)
 {
+	struct txn *txn = in_txn();
+	if (txn == NULL) {
+		/* There are no transaction yet - return an error. */
+		diag_set(ClientError, ER_NO_TRANSACTION);
+		return NULL;
+	}
 	union natural_align {
 		void *p;
 		double lf;
 		long l;
 	};
-	return region_aligned_alloc(&fiber()->gc, size,
+	return region_aligned_alloc(&txn->region, size,
 	                            alignof(union natural_align));
 }
 
@@ -535,11 +578,11 @@ box_txn_savepoint()
 {
 	struct txn *txn = in_txn();
 	if (txn == NULL) {
-		diag_set(ClientError, ER_SAVEPOINT_NO_TRANSACTION);
+		diag_set(ClientError, ER_NO_TRANSACTION);
 		return NULL;
 	}
 	struct txn_savepoint *svp =
-		(struct txn_savepoint *) region_alloc_object(&fiber()->gc,
+		(struct txn_savepoint *) region_alloc_object(&txn->region,
 							struct txn_savepoint);
 	if (svp == NULL) {
 		diag_set(OutOfMemory, sizeof(*svp),
@@ -558,7 +601,7 @@ box_txn_rollback_to_savepoint(box_txn_savepoint_t *svp)
 {
 	struct txn *txn = in_txn();
 	if (txn == NULL) {
-		diag_set(ClientError, ER_SAVEPOINT_NO_TRANSACTION);
+		diag_set(ClientError, ER_NO_TRANSACTION);
 		return -1;
 	}
 	struct txn_stmt *stmt = svp->stmt == NULL ? NULL :
diff --git a/src/box/txn.h b/src/box/txn.h
index 747b38db3..fe8a299f2 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -132,6 +132,8 @@ struct autoinc_id_entry {
 };
 
 struct txn {
+	struct stailq_entry list;
+	struct region region;
 	/**
 	 * A sequentially growing transaction id, assigned when
 	 * a transaction is initiated. Used to identify
diff --git a/test/box/misc.result b/test/box/misc.result
index a1f7a0990..d7d76d87e 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -443,7 +443,7 @@ t;
   111: box.error.WRONG_SPACE_OPTIONS
   112: box.error.UNSUPPORTED_INDEX_FEATURE
   113: box.error.VIEW_IS_RO
-  114: box.error.SAVEPOINT_NO_TRANSACTION
+  114: box.error.NO_TRANSACTION
   115: box.error.SYSTEM
   116: box.error.LOADING
   117: box.error.CONNECTION_TO_SELF
diff --git a/test/engine/savepoint.result b/test/engine/savepoint.result
index b3a918de0..86a2d0be2 100644
--- a/test/engine/savepoint.result
+++ b/test/engine/savepoint.result
@@ -14,7 +14,7 @@ s1 = nil
 ...
 s1 = box.savepoint()
 ---
-- error: Can not set a savepoint in absence of active transaction
+- error: No active transaction
 ...
 box.rollback_to_savepoint(s1)
 ---
diff --git a/test/sql/savepoints.result b/test/sql/savepoints.result
index 2f943bd9b..bb4a296fa 100644
--- a/test/sql/savepoints.result
+++ b/test/sql/savepoints.result
@@ -14,15 +14,15 @@ box.execute('pragma sql_default_engine=\''..engine..'\'')
 --
 box.execute('SAVEPOINT t1;');
 ---
-- error: Can not set a savepoint in absence of active transaction
+- error: No active transaction
 ...
 box.execute('RELEASE SAVEPOINT t1;');
 ---
-- error: Can not set a savepoint in absence of active transaction
+- error: No active transaction
 ...
 box.execute('ROLLBACK TO SAVEPOINT t1;');
 ---
-- error: Can not set a savepoint in absence of active transaction
+- error: No active transaction
 ...
 box.begin() box.execute('SAVEPOINT t1;') box.execute('RELEASE SAVEPOINT t1;') box.commit();
 ---
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 02/10] Alloc journal entry on a txn memory region
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 01/10] Introduce a txn memory region Georgy Kirichenko
@ 2019-04-19 12:43 ` Georgy Kirichenko
  2019-04-24 18:21   ` [tarantool-patches] " Konstantin Osipov
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction " Georgy Kirichenko
                   ` (7 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:43 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Use txn memory to allocate a journal entry structure.
This relaxes a dependency between a journal entry and a fiber.

Prerequisites: #1254
---
 src/box/journal.c | 4 ++--
 src/box/journal.h | 4 +++-
 src/box/txn.c     | 3 ++-
 src/box/vy_log.c  | 2 +-
 4 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/src/box/journal.c b/src/box/journal.c
index 7498ba192..fe13fb6ee 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -53,14 +53,14 @@ static struct journal dummy_journal = {
 struct journal *current_journal = &dummy_journal;
 
 struct journal_entry *
-journal_entry_new(size_t n_rows)
+journal_entry_new(size_t n_rows, struct region *region)
 {
 	struct journal_entry *entry;
 
 	size_t size = (sizeof(struct journal_entry) +
 		       sizeof(entry->rows[0]) * n_rows);
 
-	entry = region_aligned_alloc(&fiber()->gc, size,
+	entry = region_aligned_alloc(region, size,
 				     alignof(struct journal_entry));
 	if (entry == NULL) {
 		diag_set(OutOfMemory, size, "region", "struct journal_entry");
diff --git a/src/box/journal.h b/src/box/journal.h
index e52316888..8ac32ee5e 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -72,13 +72,15 @@ struct journal_entry {
 	struct xrow_header *rows[];
 };
 
+struct region;
+
 /**
  * Create a new journal entry.
  *
  * @return NULL if out of memory, fiber diagnostics area is set
  */
 struct journal_entry *
-journal_entry_new(size_t n_rows);
+journal_entry_new(size_t n_rows, struct region *region);
 
 /**
  * An API for an abstract journal for all transactions of this
diff --git a/src/box/txn.c b/src/box/txn.c
index 815199645..ed5e54bb6 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -332,7 +332,8 @@ txn_write_to_wal(struct txn *txn)
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
 	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
-						      txn->n_applier_rows);
+						      txn->n_applier_rows,
+						      &txn->region);
 	if (req == NULL)
 		return -1;
 
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index be97cdbbe..85059588e 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -772,7 +772,7 @@ vy_log_flush(void)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size);
+	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
 	if (entry == NULL)
 		goto err;
 
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction memory region
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 01/10] Introduce a txn memory region Georgy Kirichenko
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 02/10] Alloc journal entry on " Georgy Kirichenko
@ 2019-04-19 12:43 ` Georgy Kirichenko
  2019-04-24 18:28   ` [tarantool-patches] " Konstantin Osipov
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure Georgy Kirichenko
                   ` (6 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:43 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Encode all statements to be written out to wal onto a transaction
memory region. This relaxes a relation between transaction and fiber
state and required for an autonomous transaction feature.

Prerequisites: #1254
---
 src/box/request.c |  2 +-
 src/box/txn.c     |  2 +-
 src/box/vy_log.c  |  2 +-
 src/box/vy_stmt.c |  6 ++++--
 src/box/xrow.c    | 21 +++++++++++++++------
 src/box/xrow.h    | 12 +++++++++---
 6 files changed, 31 insertions(+), 14 deletions(-)

diff --git a/src/box/request.c b/src/box/request.c
index 9c684af73..9edd1f8b1 100644
--- a/src/box/request.c
+++ b/src/box/request.c
@@ -61,7 +61,7 @@ request_update_header(struct request *request, struct xrow_header *row)
 	if (row == NULL)
 		return 0;
 	row->type = request->type;
-	row->bodycnt = xrow_encode_dml(request, row->body);
+	row->bodycnt = xrow_encode_dml(request, &fiber()->gc, false, row->body);
 	if (row->bodycnt < 0)
 		return -1;
 	request->header = row;
diff --git a/src/box/txn.c b/src/box/txn.c
index ed5e54bb6..ffada5984 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -68,7 +68,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
 	row->lsn = 0;
 	row->sync = 0;
 	row->tm = 0;
-	row->bodycnt = xrow_encode_dml(request, row->body);
+	row->bodycnt = xrow_encode_dml(request, &txn->region, true, row->body);
 	if (row->bodycnt < 0)
 		return -1;
 	stmt->row = row;
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 85059588e..76b21c822 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -520,7 +520,7 @@ vy_log_record_encode(const struct vy_log_record *record,
 	req.tuple_end = pos;
 	memset(row, 0, sizeof(*row));
 	row->type = req.type;
-	row->bodycnt = xrow_encode_dml(&req, row->body);
+	row->bodycnt = xrow_encode_dml(&req, &fiber()->gc, false, row->body);
 	return 0;
 }
 
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index e1cdd293d..3bc22965c 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -681,7 +681,8 @@ vy_stmt_encode_primary(struct tuple *value, struct key_def *key_def,
 	}
 	if (vy_stmt_meta_encode(value, &request, true) != 0)
 		return -1;
-	xrow->bodycnt = xrow_encode_dml(&request, xrow->body);
+	xrow->bodycnt = xrow_encode_dml(&request, &fiber()->gc, false,
+					xrow->body);
 	if (xrow->bodycnt < 0)
 		return -1;
 	return 0;
@@ -715,7 +716,8 @@ vy_stmt_encode_secondary(struct tuple *value, struct key_def *cmp_def,
 	}
 	if (vy_stmt_meta_encode(value, &request, false) != 0)
 		return -1;
-	xrow->bodycnt = xrow_encode_dml(&request, xrow->body);
+	xrow->bodycnt = xrow_encode_dml(&request, &fiber()->gc, false,
+					xrow->body);
 	if (xrow->bodycnt < 0)
 		return -1;
 	else
diff --git a/src/box/xrow.c b/src/box/xrow.c
index aaed84e38..1fc60f6d8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -746,15 +746,19 @@ request_str(const struct request *request)
 }
 
 int
-xrow_encode_dml(const struct request *request, struct iovec *iov)
+xrow_encode_dml(const struct request *request, struct region *region,
+		bool copy_tuple, struct iovec *iov)
 {
 	int iovcnt = 1;
 	const int MAP_LEN_MAX = 40;
 	uint32_t key_len = request->key_end - request->key;
 	uint32_t ops_len = request->ops_end - request->ops;
 	uint32_t tuple_meta_len = request->tuple_meta_end - request->tuple_meta;
-	uint32_t len = MAP_LEN_MAX + key_len + ops_len + tuple_meta_len;
-	char *begin = (char *) region_alloc(&fiber()->gc, len);
+	uint32_t tuple_len = copy_tuple ?
+			     (request->tuple_end - request->tuple) : 0;
+	uint32_t len = MAP_LEN_MAX + key_len + ops_len + tuple_meta_len +
+		       tuple_len;
+	char *begin = (char *) region_alloc(region, len);
 	if (begin == NULL) {
 		diag_set(OutOfMemory, len, "region_alloc", "begin");
 		return -1;
@@ -796,9 +800,14 @@ xrow_encode_dml(const struct request *request, struct iovec *iov)
 	}
 	if (request->tuple) {
 		pos = mp_encode_uint(pos, IPROTO_TUPLE);
-		iov[iovcnt].iov_base = (void *) request->tuple;
-		iov[iovcnt].iov_len = (request->tuple_end - request->tuple);
-		iovcnt++;
+		if (copy_tuple) {
+			memcpy(pos, request->tuple, tuple_len);
+			pos += tuple_len;
+		} else {
+			iov[iovcnt].iov_base = (void *) request->tuple;
+			iov[iovcnt].iov_len = (request->tuple_end - request->tuple);
+			iovcnt++;
+		}
 		map_size++;
 	}
 
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3a8cba191..c1d737efd 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -54,6 +54,8 @@ enum {
 	IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
 };
 
+struct region;
+
 struct xrow_header {
 	/* (!) Please update txn_add_redo() after changing members */
 
@@ -195,12 +197,15 @@ xrow_decode_dml(struct xrow_header *xrow, struct request *request,
 /**
  * Encode the request fields to iovec using region_alloc().
  * @param request request to encode
+ * @param region region to encode
+ * @param copy_tuple if true then tuple is going to be copied to the region
  * @param iov[out] iovec to fill
  * @retval -1 on error, see diag
  * @retval > 0 the number of iovecs used
  */
 int
-xrow_encode_dml(const struct request *request, struct iovec *iov);
+xrow_encode_dml(const struct request *request, struct region *region,
+		bool copy_tuple, struct iovec *iov);
 
 /**
  * CALL/EVAL request.
@@ -713,9 +718,10 @@ xrow_decode_dml_xc(struct xrow_header *row, struct request *request,
 
 /** @copydoc xrow_encode_dml. */
 static inline int
-xrow_encode_dml_xc(const struct request *request, struct iovec *iov)
+xrow_encode_dml_xc(const struct request *request, struct region *region,
+		   bool copy_tuple, struct iovec *iov)
 {
-	int iovcnt = xrow_encode_dml(request, iov);
+	int iovcnt = xrow_encode_dml(request, region, copy_tuple, iov);
 	if (iovcnt < 0)
 		diag_raise();
 	return iovcnt;
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
                   ` (2 preceding siblings ...)
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction " Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
  2019-04-24 19:07   ` [tarantool-patches] " Konstantin Osipov
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
                   ` (5 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Move transaction auto start and auto commit behavior to the box level.
From now a transaction won't start and commit automatically without
txn_begin/txn_commit invocations. This is a part of a bigger transaction
refactoring in order to implement detachable transactions and a parallel
applier.

Prerequisites: #1254
---
 src/box/applier.cc     | 27 ++++++++++++---
 src/box/box.cc         | 76 +++++++++++++++++++++++++++++++-----------
 src/box/memtx_engine.c | 10 ++++--
 src/box/txn.c          | 25 ++++----------
 src/box/txn.h          |  7 +---
 src/box/vinyl.c        | 12 +++----
 6 files changed, 99 insertions(+), 58 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 373e1feb9..3b74e0f54 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -172,11 +172,22 @@ applier_writer_f(va_list ap)
 static int
 apply_initial_join_row(struct xrow_header *row)
 {
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
 	struct request request;
 	xrow_decode_dml(row, &request, dml_request_key_map(row->type));
-	struct space *space = space_cache_find_xc(request.space_id);
+	struct space *space = space_cache_find(request.space_id);
+	if (space == NULL)
+		return -1;
 	/* no access checks here - applier always works with admin privs */
-	return space_apply_initial_join_row(space, &request);
+	if (space_apply_initial_join_row(space, &request)) {
+		txn_rollback();
+		return -1;
+	}
+	int rc = txn_commit(txn);
+	fiber_gc();
+	return rc;
 }
 
 /**
@@ -403,8 +414,16 @@ applier_join(struct applier *applier)
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
-			if (apply_row(&row) != 0)
+			struct txn *txn = txn_begin();
+			if (txn == NULL)
+				diag_raise();
+			if (apply_row(&row) != 0) {
+				txn_rollback();
+				diag_raise();
+			}
+			if (txn_commit(txn) != 0)
 				diag_raise();
+			fiber_gc();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -555,7 +574,7 @@ applier_apply_tx(struct stailq *rows)
 	 * conflict safely access failed xrow object and allocate
 	 * IPROTO_NOP on gc.
 	 */
-	struct txn *txn = txn_begin(false);
+	struct txn *txn = txn_begin();
 	struct applier_tx_row *item;
 	if (txn == NULL)
 		diag_raise();
diff --git a/src/box/box.cc b/src/box/box.cc
index 7828f575b..54210474f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -169,34 +169,54 @@ int
 box_process_rw(struct request *request, struct space *space,
 	       struct tuple **result)
 {
+	struct tuple *tuple = NULL;
+	struct txn *txn = in_txn();
+	bool autocommit = txn == NULL;
+	if (txn == NULL && (txn = txn_begin()) == NULL)
+		return -1;
 	assert(iproto_type_is_dml(request->type));
 	rmean_collect(rmean_box, request->type, 1);
 	if (access_check_space(space, PRIV_W) != 0)
-		return -1;
-	struct txn *txn = txn_begin_stmt(space);
-	if (txn == NULL)
-		return -1;
-	struct tuple *tuple;
+		goto fail;
+	if (txn_begin_stmt(space) == NULL)
+		goto fail;
 	if (space_execute_dml(space, txn, request, &tuple) != 0) {
 		txn_rollback_stmt();
-		return -1;
+		goto fail;
 	}
-	if (result == NULL)
-		return txn_commit_stmt(txn, request);
-	*result = tuple;
-	if (tuple == NULL)
-		return txn_commit_stmt(txn, request);
 	/*
 	 * Pin the tuple locally before the commit,
 	 * otherwise it may go away during yield in
 	 * when WAL is written in autocommit mode.
 	 */
-	tuple_ref(tuple);
-	int rc = txn_commit_stmt(txn, request);
-	if (rc == 0)
+	if (tuple != NULL)
+		tuple_ref(tuple);
+
+	if (result != NULL)
+		*result = tuple;
+
+	if (txn_commit_stmt(txn, request) != 0)
+		goto fail;
+	if (autocommit) {
+		if (txn_commit(txn) != 0)
+			goto fail;
+		fiber_gc();
+	}
+
+	if (tuple != NULL && result != NULL) {
 		tuple_bless(tuple);
-	tuple_unref(tuple);
-	return rc;
+	}
+
+	if (tuple != NULL)
+		tuple_unref(tuple);
+	return 0;
+
+fail:
+	if (autocommit)
+		txn_rollback();
+	if (tuple != NULL)
+		tuple_unref(tuple);
+	return -1;
 }
 
 void
@@ -299,8 +319,12 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
 		struct space *space = space_cache_find_xc(request.space_id);
-		if (box_process_rw(&request, space, NULL) != 0) {
+		struct txn *txn = txn_begin();
+		if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
+		    txn_commit(txn) != 0) {
 			say_error("error applying row: %s", request_str(&request));
+			if (txn != NULL)
+				txn_rollback();
 			diag_raise();
 		}
 	}
@@ -1313,8 +1337,15 @@ box_sequence_reset(uint32_t seq_id)
 static inline void
 box_register_replica(uint32_t id, const struct tt_uuid *uuid)
 {
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		diag_raise();
 	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
-		 (unsigned) id, tt_uuid_str(uuid)) != 0)
+		 (unsigned) id, tt_uuid_str(uuid)) != 0) {
+		txn_rollback();
+		diag_raise();
+	}
+	if (txn_commit(txn) != 0)
 		diag_raise();
 	assert(replica_by_uuid(uuid)->id == id);
 }
@@ -1636,9 +1667,16 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
 		uu = *replicaset_uuid;
 	else
 		tt_uuid_create(&uu);
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		diag_raise();
 	/* Save replica set UUID in _schema */
 	if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
-		 tt_uuid_str(&uu)))
+		 tt_uuid_str(&uu))) {
+		txn_rollback();
+		diag_raise();
+	}
+	if (txn_commit(txn) != 0)
 		diag_raise();
 }
 
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 4d99910cb..afa1739c6 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -272,16 +272,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 		diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
 		return -1;
 	}
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
 	/* no access checks here - applier always works with admin privs */
-	if (space_apply_initial_join_row(space, &request) != 0)
+	if (space_apply_initial_join_row(space, &request) != 0) {
+		txn_rollback();
 		return -1;
+	}
+	int rc = txn_commit(txn);
 	/*
 	 * Don't let gc pool grow too much. Yet to
 	 * it before reading the next row, to make
 	 * sure it's not freed along here.
 	 */
 	fiber_gc();
-	return 0;
+	return rc;
 }
 
 /** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/txn.c b/src/box/txn.c
index ffada5984..804962767 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -173,7 +173,7 @@ txn_free(struct txn *txn)
 }
 
 struct txn *
-txn_begin(bool is_autocommit)
+txn_begin()
 {
 	static int64_t tsn = 0;
 	assert(! in_txn());
@@ -186,7 +186,6 @@ txn_begin(bool is_autocommit)
 	txn->n_new_rows = 0;
 	txn->n_local_rows = 0;
 	txn->n_applier_rows = 0;
-	txn->is_autocommit = is_autocommit;
 	txn->has_triggers  = false;
 	txn->is_aborted = false;
 	txn->in_sub_stmt = 0;
@@ -224,19 +223,15 @@ txn_begin_stmt(struct space *space)
 {
 	struct txn *txn = in_txn();
 	if (txn == NULL) {
-		txn = txn_begin(true);
-		if (txn == NULL)
-			return NULL;
+		diag_set(ClientError, ER_NO_TRANSACTION);
+		return NULL;
 	} else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
 		diag_set(ClientError, ER_SUB_STMT_MAX);
 		return NULL;
 	}
 	struct txn_stmt *stmt = txn_stmt_new(txn);
-	if (stmt == NULL) {
-		if (txn->is_autocommit && txn->in_sub_stmt == 0)
-			txn_rollback();
+	if (stmt == NULL)
 		return NULL;
-	}
 	if (space == NULL)
 		return txn;
 
@@ -271,8 +266,7 @@ txn_is_distributed(struct txn *txn)
 }
 
 /**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * End a statement.
  */
 int
 txn_commit_stmt(struct txn *txn, struct request *request)
@@ -315,11 +309,6 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 			goto fail;
 	}
 	--txn->in_sub_stmt;
-	if (txn->is_autocommit && txn->in_sub_stmt == 0) {
-		int rc = txn_commit(txn);
-		fiber_gc();
-		return rc;
-	}
 	return 0;
 fail:
 	txn_rollback_stmt();
@@ -446,8 +435,6 @@ txn_rollback_stmt()
 	if (txn == NULL || txn->in_sub_stmt == 0)
 		return;
 	txn->in_sub_stmt--;
-	if (txn->is_autocommit && txn->in_sub_stmt == 0)
-		return txn_rollback();
 	txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]);
 }
 
@@ -518,7 +505,7 @@ box_txn_begin()
 		diag_set(ClientError, ER_ACTIVE_TRANSACTION);
 		return -1;
 	}
-	if (txn_begin(false) == NULL)
+	if (txn_begin() == NULL)
 		return -1;
 	return 0;
 }
diff --git a/src/box/txn.h b/src/box/txn.h
index fe8a299f2..7f999b8e9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -154,11 +154,6 @@ struct txn {
 	 * already assigned LSN.
 	 */
 	int n_applier_rows;
-	/**
-	 * True if this transaction is running in autocommit mode
-	 * (statement end causes an automatic transaction commit).
-	 */
-	bool is_autocommit;
 	/**
 	 * True if the transaction was aborted so should be
 	 * rolled back at commit.
@@ -206,7 +201,7 @@ in_txn()
  * @pre no transaction is active
  */
 struct txn *
-txn_begin(bool is_autocommit);
+txn_begin();
 
 /**
  * Commit a transaction.
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 41918beba..fe258e784 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2399,10 +2399,8 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn)
 	txn->engine_tx = vy_tx_begin(env->xm);
 	if (txn->engine_tx == NULL)
 		return -1;
-	if (!txn->is_autocommit) {
-		trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
-		trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
-	}
+	trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
+	trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
 	return 0;
 }
 
@@ -2472,8 +2470,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
 	vy_regulator_check_dump_watermark(&env->regulator);
 
 	txn->engine_tx = NULL;
-	if (!txn->is_autocommit)
-		trigger_clear(&txn->fiber_on_stop);
+	trigger_clear(&txn->fiber_on_stop);
 }
 
 static void
@@ -2487,8 +2484,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
 	vy_tx_rollback(tx);
 
 	txn->engine_tx = NULL;
-	if (!txn->is_autocommit)
-		trigger_clear(&txn->fiber_on_stop);
+	trigger_clear(&txn->fiber_on_stop);
 }
 
 static int
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
                   ` (3 preceding siblings ...)
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
  2019-04-24 19:12   ` [tarantool-patches] " Konstantin Osipov
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt Georgy Kirichenko
                   ` (4 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Don't touch a fiber gc storage on a transaction rollback. This relaxes
dependencies between fiber and transaction life cycles.

Prerequisites: #1254
---
 src/box/applier.cc     |  9 ++++++---
 src/box/box.cc         | 24 +++++++++++++++++-------
 src/box/call.c         | 16 ++++++++++++----
 src/box/memtx_engine.c |  3 ++-
 src/box/txn.c          | 22 +++++++++++-----------
 src/box/txn.h          |  7 +++++--
 src/box/vy_scheduler.c |  6 ++++--
 7 files changed, 57 insertions(+), 30 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 3b74e0f54..42b6efc4d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -182,7 +182,8 @@ apply_initial_join_row(struct xrow_header *row)
 		return -1;
 	/* no access checks here - applier always works with admin privs */
 	if (space_apply_initial_join_row(space, &request)) {
-		txn_rollback();
+		txn_rollback(txn);
+		fiber_gc();
 		return -1;
 	}
 	int rc = txn_commit(txn);
@@ -418,7 +419,8 @@ applier_join(struct applier *applier)
 			if (txn == NULL)
 				diag_raise();
 			if (apply_row(&row) != 0) {
-				txn_rollback();
+				txn_rollback(txn);
+				fiber_gc();
 				diag_raise();
 			}
 			if (txn_commit(txn) != 0)
@@ -621,7 +623,8 @@ applier_apply_tx(struct stailq *rows)
 	return txn_commit(txn);
 
 rollback:
-	txn_rollback();
+	txn_rollback(txn);
+	fiber_gc();
 	return -1;
 }
 
diff --git a/src/box/box.cc b/src/box/box.cc
index 54210474f..835a00c95 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -197,9 +197,12 @@ box_process_rw(struct request *request, struct space *space,
 
 	if (txn_commit_stmt(txn, request) != 0)
 		goto fail;
+
 	if (autocommit) {
-		if (txn_commit(txn) != 0)
+		if (txn_commit(txn) != 0) {
+			txn = NULL;
 			goto fail;
+		}
 		fiber_gc();
 	}
 
@@ -212,8 +215,11 @@ box_process_rw(struct request *request, struct space *space,
 	return 0;
 
 fail:
-	if (autocommit)
-		txn_rollback();
+	if (autocommit) {
+		if (txn != NULL)
+			txn_rollback(txn);
+		fiber_gc();
+	}
 	if (tuple != NULL)
 		tuple_unref(tuple);
 	return -1;
@@ -323,8 +329,10 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 		if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
 		    txn_commit(txn) != 0) {
 			say_error("error applying row: %s", request_str(&request));
-			if (txn != NULL)
-				txn_rollback();
+			if (txn != NULL) {
+				txn_rollback(txn);
+				fiber_gc();
+			}
 			diag_raise();
 		}
 	}
@@ -1342,7 +1350,8 @@ box_register_replica(uint32_t id, const struct tt_uuid *uuid)
 		diag_raise();
 	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
 		 (unsigned) id, tt_uuid_str(uuid)) != 0) {
-		txn_rollback();
+		txn_rollback(txn);
+		fiber_gc();
 		diag_raise();
 	}
 	if (txn_commit(txn) != 0)
@@ -1673,7 +1682,8 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
 	/* Save replica set UUID in _schema */
 	if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
 		 tt_uuid_str(&uu))) {
-		txn_rollback();
+		txn_rollback(txn);
+		fiber_gc();
 		diag_raise();
 	}
 	if (txn_commit(txn) != 0)
diff --git a/src/box/call.c b/src/box/call.c
index b9750c5f3..4b5f155df 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -208,13 +208,17 @@ box_process_call(struct call_request *request, struct port *port)
 		fiber_set_user(fiber(), orig_credentials);
 
 	if (rc != 0) {
-		txn_rollback();
+		if (in_txn() != NULL)
+			txn_rollback(in_txn());
+		fiber_gc();
 		return -1;
 	}
 
 	if (in_txn()) {
 		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
-		txn_rollback();
+		if (in_txn() != NULL)
+			txn_rollback(in_txn());
+		fiber_gc();
 		return -1;
 	}
 
@@ -229,13 +233,17 @@ box_process_eval(struct call_request *request, struct port *port)
 	if (access_check_universe(PRIV_X) != 0)
 		return -1;
 	if (box_lua_eval(request, port) != 0) {
-		txn_rollback();
+		if (in_txn() != NULL)
+			txn_rollback(in_txn());
+		fiber_gc();
 		return -1;
 	}
 
 	if (in_txn()) {
 		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
-		txn_rollback();
+		if (in_txn() != NULL)
+			txn_rollback(in_txn());
+		fiber_gc();
 		return -1;
 	}
 
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index afa1739c6..cf2205c68 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -277,7 +277,8 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 		return -1;
 	/* no access checks here - applier always works with admin privs */
 	if (space_apply_initial_join_row(space, &request) != 0) {
-		txn_rollback();
+		txn_rollback(txn);
+		fiber_gc();
 		return -1;
 	}
 	int rc = txn_commit(txn);
diff --git a/src/box/txn.c b/src/box/txn.c
index 804962767..fdfffa144 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -347,7 +347,7 @@ txn_write_to_wal(struct txn *txn)
 
 	if (res < 0) {
 		/* Cascading rollback. */
-		txn_rollback(); /* Perform our part of cascading rollback. */
+		txn_rollback(txn); /* Perform our part of cascading rollback. */
 		/*
 		 * Move fiber to end of event loop to avoid
 		 * execution of any new requests before all
@@ -396,7 +396,7 @@ txn_commit(struct txn *txn)
 	if (txn->n_new_rows + txn->n_applier_rows > 0) {
 		txn->signature = txn_write_to_wal(txn);
 		if (txn->signature < 0)
-			goto fail;
+			return -1;
 	}
 	/*
 	 * The transaction is in the binary log. No action below
@@ -424,7 +424,7 @@ txn_commit(struct txn *txn)
 	txn_free(txn);
 	return 0;
 fail:
-	txn_rollback();
+	txn_rollback(txn);
 	return -1;
 }
 
@@ -439,11 +439,9 @@ txn_rollback_stmt()
 }
 
 void
-txn_rollback()
+txn_rollback(struct txn *txn)
 {
-	struct txn *txn = in_txn();
-	if (txn == NULL)
-		return;
+	assert(txn == in_txn());
 	/* Rollback triggers must not throw. */
 	if (txn->has_triggers &&
 	    trigger_run(&txn->on_rollback, txn) != 0) {
@@ -458,8 +456,6 @@ txn_rollback()
 	stailq_foreach_entry(stmt, &txn->stmts, next)
 		txn_stmt_unref_tuples(stmt);
 
-	/** Free volatile txn memory. */
-	fiber_gc();
 	fiber_set_txn(fiber(), NULL);
 	txn_free(txn);
 }
@@ -535,11 +531,14 @@ int
 box_txn_rollback()
 {
 	struct txn *txn = in_txn();
+	if (txn == NULL)
+		return 0;
 	if (txn && txn->in_sub_stmt) {
 		diag_set(ClientError, ER_ROLLBACK_IN_SUB_STMT);
 		return -1;
 	}
-	txn_rollback(); /* doesn't throw */
+	txn_rollback(txn); /* doesn't throw */
+	fiber_gc();
 	return 0;
 }
 
@@ -617,6 +616,7 @@ txn_on_stop(struct trigger *trigger, void *event)
 {
 	(void) trigger;
 	(void) event;
-	txn_rollback();                 /* doesn't yield or fail */
+	txn_rollback(in_txn());                 /* doesn't yield or fail */
+
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index 7f999b8e9..ae2d7b9a5 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -213,9 +213,12 @@ txn_begin();
 int
 txn_commit(struct txn *txn);
 
-/** Rollback a transaction, if any. */
+/**
+ * Rollback a transaction.
+ * @pre txn == in_txn()
+ */
 void
-txn_rollback();
+txn_rollback(struct txn *txn);
 
 /**
  * Roll back the transaction but keep the object around.
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index fabb4bb48..1f6b30f4a 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -889,8 +889,10 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
 	for (int i = 0; i < batch->count; i++) {
 		if (vy_deferred_delete_process_one(deferred_delete_space,
 						   pk->space_id, pk->mem_format,
-						   &batch->stmt[i]) != 0)
+						   &batch->stmt[i]) != 0) {
+			txn_rollback(txn);
 			goto fail;
+		}
 	}
 
 	if (txn_commit(txn) != 0)
@@ -900,7 +902,7 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
 fail:
 	batch->is_failed = true;
 	diag_move(diag_get(), &batch->diag);
-	txn_rollback();
+	fiber_gc();
 }
 
 /**
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
                   ` (4 preceding siblings ...)
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
  2019-04-24 19:13   ` [tarantool-patches] " Konstantin Osipov
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure Georgy Kirichenko
                   ` (3 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Pass a txn structure to tnx_begin_stmt and txn_rollback_stmt functions.

Prerequisites: #1254
---
 src/box/applier.cc     |  4 ++--
 src/box/box.cc         |  8 ++++----
 src/box/index.cc       | 10 +++++-----
 src/box/journal.c      |  1 +
 src/box/journal.h      | 16 +++++++++++++++
 src/box/memtx_space.c  |  8 ++++----
 src/box/sql.c          |  2 +-
 src/box/txn.c          | 45 ++++++++++++++++++++++++------------------
 src/box/txn.h          |  9 ++++-----
 src/box/vy_scheduler.c |  6 +++---
 src/box/wal.c          |  5 ++++-
 11 files changed, 70 insertions(+), 44 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 42b6efc4d..43b1a84bc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -201,8 +201,8 @@ static int
 process_nop(struct request *request)
 {
 	assert(request->type == IPROTO_NOP);
-	struct txn *txn = txn_begin_stmt(NULL);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, NULL) == NULL)
 		return -1;
 	return txn_commit_stmt(txn, request);
 }
diff --git a/src/box/box.cc b/src/box/box.cc
index 835a00c95..46cd444fd 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -178,10 +178,10 @@ box_process_rw(struct request *request, struct space *space,
 	rmean_collect(rmean_box, request->type, 1);
 	if (access_check_space(space, PRIV_W) != 0)
 		goto fail;
-	if (txn_begin_stmt(space) == NULL)
+	if (txn_begin_stmt(txn, space) == NULL)
 		goto fail;
 	if (space_execute_dml(space, txn, request, &tuple) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		goto fail;
 	}
 	/*
@@ -1087,7 +1087,7 @@ box_select(uint32_t space_id, uint32_t index_id,
 	struct iterator *it = index_create_iterator(index, type,
 						    key, part_count);
 	if (it == NULL) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 
@@ -1112,7 +1112,7 @@ box_select(uint32_t space_id, uint32_t index_id,
 
 	if (rc != 0) {
 		port_destroy(port);
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
diff --git a/src/box/index.cc b/src/box/index.cc
index 2817d076d..baee0c1cb 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -239,7 +239,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_get(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -273,7 +273,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_min(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -305,7 +305,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_max(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -339,7 +339,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
 		return -1;
 	ssize_t count = index_count(index, itype, key, part_count);
 	if (count < 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -376,7 +376,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
 	struct iterator *it = index_create_iterator(index, itype,
 						    key, part_count);
 	if (it == NULL) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return NULL;
 	}
 	txn_commit_ro_stmt(txn);
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..5cffc7452 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -70,6 +70,7 @@ journal_entry_new(size_t n_rows, struct region *region)
 	entry->n_rows = n_rows;
 	entry->res = -1;
 	entry->fiber = fiber();
+	rlist_create(&entry->on_error);
 	return entry;
 }
 
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..94be0b007 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -32,6 +32,7 @@
  */
 #include <stdint.h>
 #include <stdbool.h>
+#include "trigger.h"
 #include "salad/stailq.h"
 
 #if defined(__cplusplus)
@@ -58,6 +59,12 @@ struct journal_entry {
 	 * The fiber issuing the request.
 	 */
 	struct fiber *fiber;
+	/**
+	 * A trigger list to call if write failed. Triggers are going to be
+	 * fired before any other processing and are a good place to implement
+	 * any rollback non-yielding behavior.
+	 */
+	struct rlist on_error;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -82,6 +89,15 @@ struct region;
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region);
 
+/**
+ * Add an on_error trigger to a journal entry.
+ */
+static inline void
+journal_entry_on_error(struct journal_entry *entry, struct trigger *trigger)
+{
+	trigger_add(&entry->on_error, trigger);
+}
+
 /**
  * An API for an abstract journal for all transactions of this
  * instance, as well as for multiple instances in case of
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index a28204d30..f2b28fb71 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
 		return -1;
 	}
 	request->header->replica_id = 0;
-	struct txn *txn = txn_begin_stmt(space);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	struct txn_stmt *stmt = txn_begin_stmt(txn, space);
+	if (stmt == NULL)
 		return -1;
-	struct txn_stmt *stmt = txn_current_stmt(txn);
 	stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
 					  request->tuple_end);
 	if (stmt->new_tuple == NULL)
@@ -326,7 +326,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
 
 rollback:
 	say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return -1;
 }
 
diff --git a/src/box/sql.c b/src/box/sql.c
index 1fb93e106..0051b93c0 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -911,7 +911,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
 				      part_count);
 	if (it == NULL) {
 		if (txn != NULL)
-			txn_rollback_stmt();
+			txn_rollback_stmt(txn);
 		pCur->eState = CURSOR_INVALID;
 		return SQL_TARANTOOL_ITERATOR_FAIL;
 	}
diff --git a/src/box/txn.c b/src/box/txn.c
index fdfffa144..8f5b66480 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -218,14 +218,10 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
 	return 0;
 }
 
-struct txn *
-txn_begin_stmt(struct space *space)
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space)
 {
-	struct txn *txn = in_txn();
-	if (txn == NULL) {
-		diag_set(ClientError, ER_NO_TRANSACTION);
-		return NULL;
-	} else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+	if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
 		diag_set(ClientError, ER_SUB_STMT_MAX);
 		return NULL;
 	}
@@ -233,7 +229,7 @@ txn_begin_stmt(struct space *space)
 	if (stmt == NULL)
 		return NULL;
 	if (space == NULL)
-		return txn;
+		return stmt;
 
 	if (trigger_run(&space->on_stmt_begin, txn) != 0)
 		goto fail;
@@ -246,9 +242,9 @@ txn_begin_stmt(struct space *space)
 	if (engine_begin_statement(engine, txn) != 0)
 		goto fail;
 
-	return txn;
+	return stmt;
 fail:
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return NULL;
 }
 
@@ -311,10 +307,23 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	--txn->in_sub_stmt;
 	return 0;
 fail:
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return -1;
 }
 
+/*
+ * Callback called if journal write failed.
+ */
+static void
+journal_write_error_cb(struct trigger *trigger, void *event)
+{
+	(void) event;
+	struct txn *txn = (struct txn *)trigger->data;
+	if (txn->engine)
+		engine_rollback(txn->engine, txn);
+	txn->engine = NULL;
+}
+
 static int64_t
 txn_write_to_wal(struct txn *txn)
 {
@@ -326,6 +335,10 @@ txn_write_to_wal(struct txn *txn)
 	if (req == NULL)
 		return -1;
 
+	struct trigger on_error;
+	trigger_create(&on_error, journal_write_error_cb, txn, NULL);
+	journal_entry_on_error(req, &on_error);
+
 	struct txn_stmt *stmt;
 	struct xrow_header **remote_row = req->rows;
 	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
@@ -348,12 +361,6 @@ txn_write_to_wal(struct txn *txn)
 	if (res < 0) {
 		/* Cascading rollback. */
 		txn_rollback(txn); /* Perform our part of cascading rollback. */
-		/*
-		 * Move fiber to end of event loop to avoid
-		 * execution of any new requests before all
-		 * pending rollbacks are processed.
-		 */
-		fiber_reschedule();
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
 	} else if (stop - start > too_long_threshold) {
@@ -429,9 +436,8 @@ fail:
 }
 
 void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
 {
-	struct txn *txn = in_txn();
 	if (txn == NULL || txn->in_sub_stmt == 0)
 		return;
 	txn->in_sub_stmt--;
@@ -449,6 +455,7 @@ txn_rollback(struct txn *txn)
 		unreachable();
 		panic("rollback trigger failed");
 	}
+
 	if (txn->engine)
 		engine_rollback(txn->engine, txn);
 
diff --git a/src/box/txn.h b/src/box/txn.h
index ae2d7b9a5..96719638b 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -261,11 +261,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
 }
 
 /**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
  */
-struct txn *
-txn_begin_stmt(struct space *space);
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space);
 
 int
 txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -324,7 +323,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
  * rolls back the entire transaction.
  */
 void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
 
 /**
  * Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 1f6b30f4a..053558c8c 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
 
 	tuple_unref(delete);
 
-	struct txn *txn = txn_begin_stmt(deferred_delete_space);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, deferred_delete_space) == NULL)
 		return -1;
 
 	struct tuple *unused;
 	if (space_execute_dml(deferred_delete_space, txn,
 			      &request, &unused) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	return txn_commit_stmt(txn, &request);
diff --git a/src/box/wal.c b/src/box/wal.c
index ad8ff7c62..8dfa3ef27 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -300,7 +300,10 @@ tx_schedule_rollback(struct cmsg *msg)
 	 * in-memory database state.
 	 */
 	stailq_reverse(&writer->rollback);
-	/* Must not yield. */
+	/* Call error callback for each request before any scheduling. */
+	struct journal_entry *req;
+	stailq_foreach_entry(req, &writer->rollback, fifo)
+		trigger_run(&req->on_error, NULL);
 	tx_schedule_queue(&writer->rollback);
 	stailq_create(&writer->rollback);
 }
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
                   ` (5 preceding siblings ...)
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
  2019-04-24 19:16   ` [tarantool-patches] " Konstantin Osipov
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 08/10] Use mempool to alloc wal messages Georgy Kirichenko
                   ` (2 subsequent siblings)
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Use a trigger to handle journal entry write done event. This relaxes
friction between fiber and transaction life cycles.

Prerequisites: #1254
---
 src/box/journal.c |  4 ++--
 src/box/journal.h | 16 ++++++++++++++--
 src/box/wal.c     | 36 +++++++++++++++++++++++++-----------
 3 files changed, 41 insertions(+), 15 deletions(-)

diff --git a/src/box/journal.c b/src/box/journal.c
index 5cffc7452..b0f4d48b5 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -30,7 +30,6 @@
  */
 #include "journal.h"
 #include <small/region.h>
-#include <fiber.h>
 #include <diag.h>
 
 /**
@@ -66,11 +65,12 @@ journal_entry_new(size_t n_rows, struct region *region)
 		diag_set(OutOfMemory, size, "region", "struct journal_entry");
 		return NULL;
 	}
+	rlist_create(&entry->done_trigger);
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
-	entry->fiber = fiber();
 	rlist_create(&entry->on_error);
+	entry->done = false;
 	return entry;
 }
 
diff --git a/src/box/journal.h b/src/box/journal.h
index 94be0b007..4a2fb3585 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -34,6 +34,7 @@
 #include <stdbool.h>
 #include "trigger.h"
 #include "salad/stailq.h"
+#include "trigger.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -56,9 +57,14 @@ struct journal_entry {
 	 */
 	int64_t res;
 	/**
-	 * The fiber issuing the request.
+	 * Turns to true when entry is processed by wal.
+	 */
+	bool done;
+	/**
+	 * Triggers fired when journal entry processing is done
+	 * despite of its success.
 	 */
-	struct fiber *fiber;
+	struct rlist done_trigger;
 	/**
 	 * A trigger list to call if write failed. Triggers are going to be
 	 * fired before any other processing and are a good place to implement
@@ -109,6 +115,12 @@ struct journal {
 	void (*destroy)(struct journal *journal);
 };
 
+static inline void
+journal_entry_on_done(struct journal_entry *entry, struct trigger *trigger)
+{
+	trigger_add(&entry->done_trigger, trigger);
+}
+
 /**
  * Depending on the step of recovery and instance configuration
  * points at a concrete implementation of the journal.
diff --git a/src/box/wal.c b/src/box/wal.c
index 8dfa3ef27..6ccc1220a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -258,8 +258,10 @@ tx_schedule_queue(struct stailq *queue)
 	 * are many ready fibers.
 	 */
 	struct journal_entry *req;
-	stailq_foreach_entry(req, queue, fifo)
-		fiber_wakeup(req->fiber);
+	stailq_foreach_entry(req, queue, fifo) {
+		req->done = true;
+		trigger_run(&req->done_trigger, NULL);
+	}
 }
 
 /**
@@ -1116,6 +1118,14 @@ wal_writer_f(va_list ap)
 	return 0;
 }
 
+static void
+on_wal_write_done(struct trigger *trigger, void *event)
+{
+	(void) event;
+	struct fiber_cond *cond = (struct fiber_cond *)trigger->data;
+	fiber_cond_signal(cond);
+}
+
 /**
  * WAL writer main entry point: queue a single request
  * to be written to disk and wait until this task is completed.
@@ -1167,15 +1177,19 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	batch->approx_len += entry->approx_len;
 	writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
 	cpipe_flush_input(&writer->wal_pipe);
-	/**
-	 * It's not safe to spuriously wakeup this fiber
-	 * since in that case it will ignore a possible
-	 * error from WAL writer and not roll back the
-	 * transaction.
-	 */
-	bool cancellable = fiber_set_cancellable(false);
-	fiber_yield(); /* Request was inserted. */
-	fiber_set_cancellable(cancellable);
+
+	struct fiber_cond done_cond;
+	fiber_cond_create(&done_cond);
+
+	struct trigger done_trigger;
+	trigger_create(&done_trigger, on_wal_write_done, &done_cond, NULL);
+	journal_entry_on_done(entry, &done_trigger);
+	while (!entry->done)
+		fiber_cond_wait(&done_cond);
+
+	fiber_cond_destroy(&done_cond);
+	trigger_clear(&done_trigger);
+
 	return entry->res;
 }
 
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 08/10] Use mempool to alloc wal messages
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
                   ` (6 preceding siblings ...)
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
  2019-04-24 19:18   ` [tarantool-patches] " Konstantin Osipov
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes Georgy Kirichenko
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit Georgy Kirichenko
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Don't use fiber gc region to alloc wal messages. This relaxes friction
between fiber life cycle and transaction processing.

Prerequisites: #1254
---
 src/box/wal.c | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index 6ccc1220a..f0352e938 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -89,6 +89,8 @@ struct wal_writer
 	struct stailq rollback;
 	/** A pipe from 'tx' thread to 'wal' */
 	struct cpipe wal_pipe;
+	/** A memory pool for messages. */
+	struct mempool msg_pool;
 	/* ----------------- wal ------------------- */
 	/** A setting from instance configuration - rows_per_wal */
 	int64_t wal_max_rows;
@@ -287,6 +289,7 @@ tx_schedule_commit(struct cmsg *msg)
 	/* Update the tx vclock to the latest written by wal. */
 	vclock_copy(&replicaset.vclock, &batch->vclock);
 	tx_schedule_queue(&batch->commit);
+	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
 }
 
 static void
@@ -308,6 +311,9 @@ tx_schedule_rollback(struct cmsg *msg)
 		trigger_run(&req->on_error, NULL);
 	tx_schedule_queue(&writer->rollback);
 	stailq_create(&writer->rollback);
+	if (msg != &writer->in_rollback)
+		mempool_free(&writer->msg_pool,
+			     container_of(msg, struct wal_msg, base));
 }
 
 
@@ -378,6 +384,9 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 
 	writer->on_garbage_collection = on_garbage_collection;
 	writer->on_checkpoint_threshold = on_checkpoint_threshold;
+
+	mempool_create(&writer->msg_pool, &cord()->slabc,
+		       sizeof(struct wal_msg));
 }
 
 /** Destroy a WAL writer structure. */
@@ -1158,8 +1167,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 
 		stailq_add_tail_entry(&batch->commit, entry, fifo);
 	} else {
-		batch = (struct wal_msg *)
-			region_alloc(&fiber()->gc, sizeof(struct wal_msg));
+		batch = (struct wal_msg *)mempool_alloc(&writer->msg_pool);
 		if (batch == NULL) {
 			diag_set(OutOfMemory, sizeof(struct wal_msg),
 				 "region", "struct wal_msg");
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
                   ` (7 preceding siblings ...)
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 08/10] Use mempool to alloc wal messages Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
  2019-04-24 19:19   ` [tarantool-patches] " Konstantin Osipov
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit Georgy Kirichenko
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Allow to send a journal entry to wal without wait until the writing
was finished. Two methods were introduced:
 * async_write method emits an entry to be written, returns 0 if the
 entry was successfully scheduled;
 * async_wait method waits until writing was finished and returns a
 result of journal write.

Prerequisites: #1254
---
 src/box/box.cc    | 21 ++++++++++++++++++++-
 src/box/journal.c | 18 ++++++++++++++++++
 src/box/journal.h | 30 ++++++++++++++++++++++++++++++
 src/box/wal.c     | 42 +++++++++++++++++++++++++++++++++++-------
 4 files changed, 103 insertions(+), 8 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 46cd444fd..88be886f3 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -311,10 +311,29 @@ recovery_journal_write(struct journal *base,
 	return vclock_sum(journal->vclock);
 }
 
+static int64_t
+recovery_journal_async_write(struct journal *base,
+			     struct journal_entry * /* entry */)
+{
+	(void) base;
+	return 0;
+}
+
+static int64_t
+recovery_journal_async_wait(struct journal *base,
+			    struct journal_entry * /* entry */)
+{
+	struct recovery_journal *journal = (struct recovery_journal *) base;
+	return vclock_sum(journal->vclock);
+}
+
 static inline void
 recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 {
-	journal_create(&journal->base, recovery_journal_write, NULL);
+	journal_create(&journal->base, recovery_journal_write,
+		       recovery_journal_async_write,
+		       recovery_journal_async_wait,
+		       NULL);
 	journal->vclock = v;
 }
 
diff --git a/src/box/journal.c b/src/box/journal.c
index b0f4d48b5..7ccbd8594 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -44,8 +44,26 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 	return 0;
 }
 
+static int64_t
+dummy_async_write(struct journal *journal, struct journal_entry *entry)
+{
+	(void) journal;
+	(void) entry;
+	return 0;
+}
+
+static int64_t
+dummy_async_wait(struct journal *journal, struct journal_entry *entry)
+{
+	(void) journal;
+	(void) entry;
+	return 0;
+}
+
 static struct journal dummy_journal = {
 	dummy_journal_write,
+	dummy_async_write,
+	dummy_async_wait,
 	NULL,
 };
 
diff --git a/src/box/journal.h b/src/box/journal.h
index 4a2fb3585..0292d77f3 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -112,6 +112,10 @@ journal_entry_on_error(struct journal_entry *entry, struct trigger *trigger)
 struct journal {
 	int64_t (*write)(struct journal *journal,
 			 struct journal_entry *req);
+	int64_t (*async_write)(struct journal *journal,
+			       struct journal_entry *req);
+	int64_t (*async_wait)(struct journal *journal,
+			      struct journal_entry *req);
 	void (*destroy)(struct journal *journal);
 };
 
@@ -139,6 +143,28 @@ journal_write(struct journal_entry *entry)
 	return current_journal->write(current_journal, entry);
 }
 
+/**
+ * Send a single entry to write.
+ *
+ * @return   0 if write was scheduled or -1 on error.
+ */
+static inline int64_t
+journal_async_write(struct journal_entry *entry)
+{
+	return current_journal->async_write(current_journal, entry);
+}
+
+/**
+ * Wait until entry processing finished.
+ * @return   a log sequence number (vclock signature) of the entry
+ *           or -1 on error.
+ */
+static inline int64_t
+journal_async_wait(struct journal_entry *entry)
+{
+	return current_journal->async_wait(current_journal, entry);
+}
+
 /**
  * Change the current implementation of the journaling API.
  * Happens during life cycle of an instance:
@@ -171,9 +197,13 @@ journal_set(struct journal *new_journal)
 static inline void
 journal_create(struct journal *journal,
 	       int64_t (*write)(struct journal *, struct journal_entry *),
+	       int64_t (*async_write)(struct journal *, struct journal_entry *),
+	       int64_t (*async_wait)(struct journal *, struct journal_entry *),
 	       void (*destroy)(struct journal *))
 {
 	journal->write = write;
+	journal->async_write = async_write,
+	journal->async_wait = async_wait,
 	journal->destroy = destroy;
 }
 
diff --git a/src/box/wal.c b/src/box/wal.c
index f0352e938..39b049b06 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -63,6 +63,12 @@ int wal_dir_lock = -1;
 static int64_t
 wal_write(struct journal *, struct journal_entry *);
 
+static int64_t
+wal_async_write(struct journal *, struct journal_entry *);
+
+static int64_t
+wal_async_wait(struct journal *, struct journal_entry *);
+
 static int64_t
 wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
 
@@ -362,7 +368,10 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	writer->wal_max_rows = wal_max_rows;
 	writer->wal_max_size = wal_max_size;
 	journal_create(&writer->base, wal_mode == WAL_NONE ?
-		       wal_write_in_wal_mode_none : wal_write, NULL);
+		       wal_write_in_wal_mode_none : wal_write,
+		       wal_mode == WAL_NONE ?
+		       wal_write_in_wal_mode_none: wal_async_write,
+		       wal_async_wait, NULL);
 
 	struct xlog_opts opts = xlog_opts_default;
 	opts.sync_is_async = true;
@@ -1135,12 +1144,8 @@ on_wal_write_done(struct trigger *trigger, void *event)
 	fiber_cond_signal(cond);
 }
 
-/**
- * WAL writer main entry point: queue a single request
- * to be written to disk and wait until this task is completed.
- */
 int64_t
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_async_write(struct journal *journal, struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 
@@ -1185,6 +1190,15 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	batch->approx_len += entry->approx_len;
 	writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
 	cpipe_flush_input(&writer->wal_pipe);
+	return 0;
+}
+
+int64_t
+wal_async_wait(struct journal *journal, struct journal_entry *entry)
+{
+	(void) journal;
+	if (entry->done)
+		return entry->res;
 
 	struct fiber_cond done_cond;
 	fiber_cond_create(&done_cond);
@@ -1201,6 +1215,18 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	return entry->res;
 }
 
+/**
+ * WAL writer main entry point: queue a single request
+ * to be written to disk and wait until this task is completed.
+ */
+int64_t
+wal_write(struct journal *journal, struct journal_entry *entry)
+{
+	if (wal_async_write(journal, entry) != 0)
+		return -1;
+	return wal_async_wait(journal, entry);
+}
+
 int64_t
 wal_write_in_wal_mode_none(struct journal *journal,
 			   struct journal_entry *entry)
@@ -1212,7 +1238,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
 		       entry->rows + entry->n_rows);
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
-	return vclock_sum(&writer->vclock);
+	entry->done = true;
+	entry->res = vclock_sum(&writer->vclock);
+	return entry->res;
 }
 
 void
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit
  2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
                   ` (8 preceding siblings ...)
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
  2019-04-24 19:20   ` [tarantool-patches] " Konstantin Osipov
  9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Allow asynchronous transaction commit. This adds two functions:
 * txn_async_commit that sends a transaction to a journal
 * txn_async_wait that waits until the transaction processing was done

Prerequisites: #1254
---
 src/box/txn.c | 121 +++++++++++++++++++++++++++++++++++++-------------
 src/box/txn.h |  10 +++++
 2 files changed, 99 insertions(+), 32 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 8f5b66480..eb57b861a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -194,6 +194,7 @@ txn_begin()
 	txn->engine = NULL;
 	txn->engine_tx = NULL;
 	txn->psql_txn = NULL;
+	txn->entry = NULL;
 	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
 	fiber_set_txn(fiber(), txn);
 	return txn;
@@ -324,20 +325,24 @@ journal_write_error_cb(struct trigger *trigger, void *event)
 	txn->engine = NULL;
 }
 
+/*
+ * Send the txn to a journal.
+ */
 static int64_t
-txn_write_to_wal(struct txn *txn)
+txn_journal_async_write(struct txn *txn)
 {
+	assert(txn->entry == NULL);
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
+	/* Prepare a journal entry. */
 	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
 						      txn->n_applier_rows,
 						      &txn->region);
 	if (req == NULL)
 		return -1;
 
-	struct trigger on_error;
-	trigger_create(&on_error, journal_write_error_cb, txn, NULL);
-	journal_entry_on_error(req, &on_error);
+	trigger_create(&txn->on_error, journal_write_error_cb, txn, NULL);
+	journal_entry_on_error(req, &txn->on_error);
 
 	struct txn_stmt *stmt;
 	struct xrow_header **remote_row = req->rows;
@@ -354,31 +359,34 @@ txn_write_to_wal(struct txn *txn)
 	assert(remote_row == req->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
 
-	ev_tstamp start = ev_monotonic_now(loop());
-	int64_t res = journal_write(req);
-	ev_tstamp stop = ev_monotonic_now(loop());
-
-	if (res < 0) {
-		/* Cascading rollback. */
-		txn_rollback(txn); /* Perform our part of cascading rollback. */
+	txn->entry = req;
+	/* Send entry to a journal. */
+	if (journal_async_write(txn->entry) < 0) {
 		diag_set(ClientError, ER_WAL_IO);
-		diag_log();
-	} else if (stop - start > too_long_threshold) {
-		int n_rows = txn->n_new_rows + txn->n_applier_rows;
-		say_warn_ratelimited("too long WAL write: %d rows at "
-				     "LSN %lld: %.3f sec", n_rows,
-				     res - n_rows + 1, stop - start);
+		return -1;
 	}
-	/*
-	 * Use vclock_sum() from WAL writer as transaction signature.
-	 */
-	return res;
+	return 0;
 }
 
-int
-txn_commit(struct txn *txn)
+/*
+ * Wait until journal processing finished.
+ */
+static int64_t
+txn_journal_async_wait(struct txn *txn)
+{
+	assert(txn->entry != NULL);
+	txn->signature = journal_async_wait(txn->entry);
+	if (txn->signature < 0)
+		diag_set(ClientError, ER_WAL_IO);
+	return txn->signature;
+}
+
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
 {
-	assert(txn == in_txn());
 	/*
 	 * If transaction has been started in SQL, deferred
 	 * foreign key constraints must not be violated.
@@ -388,7 +396,7 @@ txn_commit(struct txn *txn)
 		struct sql_txn *sql_txn = txn->psql_txn;
 		if (sql_txn->fk_deferred_count != 0) {
 			diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
-			goto fail;
+			return -1;
 		}
 	}
 	/*
@@ -396,15 +404,54 @@ txn_commit(struct txn *txn)
 	 * we have a bunch of IPROTO_NOP statements.
 	 */
 	if (txn->engine != NULL) {
-		if (engine_prepare(txn->engine, txn) != 0)
-			goto fail;
+		if (engine_prepare(txn->engine, txn) != 0) {
+			return -1;
+		}
 	}
+	return 0;
+}
 
-	if (txn->n_new_rows + txn->n_applier_rows > 0) {
-		txn->signature = txn_write_to_wal(txn);
-		if (txn->signature < 0)
-			return -1;
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_async_commit(struct txn *txn)
+{
+	assert(txn == in_txn());
+	if (txn_prepare(txn) != 0)
+		goto fail;
+
+	txn->start_tm = ev_monotonic_now(loop());
+	if (txn->n_new_rows + txn->n_applier_rows == 0)
+		return 0;
+
+	if (txn_journal_async_write(txn) != 0)
+		goto fail;
+
+	return 0;
+fail:
+	txn_rollback(txn);
+	return -1;
+}
+
+/*
+ * Wait until transaction processing was finished.
+ */
+int
+txn_async_wait(struct txn *txn)
+{
+	if (txn->n_new_rows + txn->n_applier_rows > 0 &&
+	    txn_journal_async_wait(txn) < 0)
+		goto fail;
+	ev_tstamp stop_tm = ev_monotonic_now(loop());
+	if (stop_tm - txn->start_tm > too_long_threshold) {
+		int n_rows = txn->n_new_rows + txn->n_applier_rows;
+		say_warn_ratelimited("too long WAL write: %d rows at "
+				     "LSN %lld: %.3f sec", n_rows,
+				     txn->signature - n_rows + 1,
+				     stop_tm - txn->start_tm);
 	}
+
 	/*
 	 * The transaction is in the binary log. No action below
 	 * may throw. In case an error has happened, there is
@@ -417,7 +464,7 @@ txn_commit(struct txn *txn)
 		panic("commit trigger failed");
 	}
 	/*
-	 * Engine can be NULL if transaction contains IPROTO_NOP
+	 * Engine can be NULL if the transaction contains IPROTO_NOP
 	 * statements only.
 	 */
 	if (txn->engine != NULL)
@@ -430,11 +477,21 @@ txn_commit(struct txn *txn)
 	fiber_set_txn(fiber(), NULL);
 	txn_free(txn);
 	return 0;
+
 fail:
 	txn_rollback(txn);
 	return -1;
 }
 
+int
+txn_commit(struct txn *txn)
+{
+	if (txn_async_commit(txn) != 0 ||
+	    txn_async_wait(txn) < 0)
+		return -1;
+	return 0;
+}
+
 void
 txn_rollback_stmt(struct txn *txn)
 {
diff --git a/src/box/txn.h b/src/box/txn.h
index 96719638b..09f086fa4 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -187,6 +187,16 @@ struct txn {
 	 /** Commit and rollback triggers */
 	struct rlist on_commit, on_rollback;
 	struct sql_txn *psql_txn;
+	/**
+	 * Trigger to call if write failed.
+	 */
+	struct trigger on_error;
+	/**
+	 * Journal entry to control txn write.
+	 */
+	struct journal_entry *entry;
+	/** Timestampt of entry write start. */
+	ev_tstamp start_tm;
 };
 
 /* Pointer to the current transaction (if any) */
-- 
2.21.0

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 01/10] Introduce a txn memory region
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 01/10] Introduce a txn memory region Georgy Kirichenko
@ 2019-04-24 18:20   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 18:20 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 19:59]:
> Attach a separate memory region for each txn structure in order to store
> all txn internal data until the transaction finished. This patch is a
> preparation to detach a txn from a fiber and a fiber gc storage.

The patch is OK to push.

See two minor comments below.

>  struct txn {
> +	struct stailq_entry list;
Please rename to in_txn_cache

> +	struct region region;

Please provide meaningful comments for both members: e.g. say that
we can have more than one transaction per an active fiber in case
of parallel applier for replication changes plus we want to detach
a transaction from a fiber to allow interactive transactions.

In theory this patch increases the amount of memory locked to the
runtime pool, but since this is O(1) right now I would ignore
this.

As a follow up patch please consider optimizing txn_new() for
cases when txn is taken from the cache: some of the members are
cleared by txn_commit()/txn_rollback() so do not need to be set
again. This is a very hot path!


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 02/10] Alloc journal entry on a txn memory region
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 02/10] Alloc journal entry on " Georgy Kirichenko
@ 2019-04-24 18:21   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 18:21 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 20:00]:
> Use txn memory to allocate a journal entry structure.
> This relaxes a dependency between a journal entry and a fiber.
> 
> Prerequisites: #1254

OK to push.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 03/10] Encode a dml statement to a transaction memory region
  2019-04-19 12:43 ` [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction " Georgy Kirichenko
@ 2019-04-24 18:28   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 18:28 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 19:59]:
> Prerequisites: #1254
> ---
>  src/box/request.c |  2 +-
>  src/box/txn.c     |  2 +-
>  src/box/vy_log.c  |  2 +-
>  src/box/vy_stmt.c |  6 ++++--
>  src/box/xrow.c    | 21 +++++++++++++++------
>  src/box/xrow.h    | 12 +++++++++---
>  6 files changed, 31 insertions(+), 14 deletions(-)
> 
> diff --git a/src/box/request.c b/src/box/request.c
> index 9c684af73..9edd1f8b1 100644
> --- a/src/box/request.c
> +++ b/src/box/request.c
> @@ -61,7 +61,7 @@ request_update_header(struct request *request, struct xrow_header *row)
>  	if (row == NULL)
>  		return 0;
>  	row->type = request->type;
> -	row->bodycnt = xrow_encode_dml(request, row->body);
> +	row->bodycnt = xrow_encode_dml(request, &fiber()->gc, false, row->body);

I don't understand the business with copy_tuple. Why do you need
it? Why not always copy the tuple? Why not have a separate wrapper
which would copy the body, e.g. xrow_encode_dml_and_copy_body()?

request_update_header() should be  using txn memory, I don't see
why it uses region memory in your patch. I haven't seen the
subsequent patches, I hope it is addressed there. If not, please
do.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 04/10] Get rid of autocommit from a txn structure
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-04-24 19:07   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:07 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 19:59]:
>  box_process_rw(struct request *request, struct space *space,
>  	       struct tuple **result)
>  {
> +	struct tuple *tuple = NULL;
> +	struct txn *txn = in_txn();
> +	bool autocommit = txn == NULL;
> +	if (txn == NULL && (txn = txn_begin()) == NULL)

nit: why not use if (autocommit && (txn == txn_begin()) == NULL)
instead
> +		return -1;
>  	assert(iproto_type_is_dml(request->type));
>  	rmean_collect(rmean_box, request->type, 1);
>  	if (access_check_space(space, PRIV_W) != 0)
> -		return -1;
> -	struct txn *txn = txn_begin_stmt(space);
> -	if (txn == NULL)
> -		return -1;
> -	struct tuple *tuple;
> +		goto fail;
> +	if (txn_begin_stmt(space) == NULL)
> +		goto fail;

I guess txn_begin_stmt can as well receive txn explicitly now, 
since we always fetch fiber->txn before calling txn_begin_stmt().
This will save us one fiber key fetch.

>  	if (space_execute_dml(space, txn, request, &tuple) != 0) {
>  		txn_rollback_stmt();
> -		return -1;
> +		goto fail;
>  	}
> -	if (result == NULL)
> -		return txn_commit_stmt(txn, request);
> -	*result = tuple;
> -	if (tuple == NULL)
> -		return txn_commit_stmt(txn, request);
>  	/*
>  	 * Pin the tuple locally before the commit,
>  	 * otherwise it may go away during yield in
>  	 * when WAL is written in autocommit mode.
>  	 */
> -	tuple_ref(tuple);
> -	int rc = txn_commit_stmt(txn, request);
> -	if (rc == 0)
> +	if (tuple != NULL)
> +		tuple_ref(tuple);
> +
> +	if (result != NULL)
> +		*result = tuple;
> +
> +	if (txn_commit_stmt(txn, request) != 0)
> +		goto fail;
> +	if (autocommit) {
> +		if (txn_commit(txn) != 0)
> +			goto fail;
> +		fiber_gc();
> +	}
> +
> +	if (tuple != NULL && result != NULL) {
>  		tuple_bless(tuple);
> -	tuple_unref(tuple);

The code here now looks quite messy. I think it would be easier to
read if we split it between if (autocommit) ... else ... branches:

if (tuple == NULL) {
    return is_autocommit ? txn_commit_stmt() : txn_commit();
}
tuple_ref(tuple);

if (txn_commit_stmt(txn, request))
    goto fail;
if (is_autocommit)
    if (txn_commit())
        goto fail;
    fiber_gc();
}
tuple_bless(tuple);
if (result)
    *result = tuple;
tuple_unref(tuple);
return 0;

The patch is generally looking good, please fix the above nits and
it will be good to go.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 05/10] Get rid of fiber_gc from txn_rollback
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-04-24 19:12   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:12 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 19:59]:

The patch is LGTM, please see one comment below.

> index fabb4bb48..1f6b30f4a 100644
> --- a/src/box/vy_scheduler.c
> +++ b/src/box/vy_scheduler.c
> @@ -889,8 +889,10 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
>  	for (int i = 0; i < batch->count; i++) {
>  		if (vy_deferred_delete_process_one(deferred_delete_space,
>  						   pk->space_id, pk->mem_format,
> -						   &batch->stmt[i]) != 0)
> +						   &batch->stmt[i]) != 0) {
> +			txn_rollback(txn);
>  			goto fail;
> +		}
>  	}
>  

I would add a new label, fail_rollback, this would be more with
vinyl style.

>  	if (txn_commit(txn) != 0)
> @@ -900,7 +902,7 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
>  fail:
>  	batch->is_failed = true;
>  	diag_move(diag_get(), &batch->diag);
> -	txn_rollback();
> +	fiber_gc();
>  }
>  

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt Georgy Kirichenko
@ 2019-04-24 19:13   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:13 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 20:00]:
> Pass a txn structure to tnx_begin_stmt and txn_rollback_stmt functions.

The patch is OK to push except the hunks related to on_error
triggers, they should be submitted separately.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 07/10] Remove fiber from a journal_entry structure
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure Georgy Kirichenko
@ 2019-04-24 19:16   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:16 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 20:00]:
> Use a trigger to handle journal entry write done event. This relaxes
> friction between fiber and transaction life cycles.

I think the idea of journal entry triggers is great, but I also
think it's a bit of an overkill to always initialize triggers and
run all fibers through the cond. Would be ideal if we keep using
simple journal_entry objects for most cases and use journal_entry
with triggers only when we need these triggers. Would that be
possible to do? We could keep the trigger-related members in
journal_entry class, only initialize and use them strictly when
needed, or we could make a derived structure which contains the
trigger state and actions.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 08/10] Use mempool to alloc wal messages
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 08/10] Use mempool to alloc wal messages Georgy Kirichenko
@ 2019-04-24 19:18   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:18 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 19:59]:
> Don't use fiber gc region to alloc wal messages. This relaxes friction
> between fiber life cycle and transaction processing.
> 

LGTM.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 09/10] Enable asyncronous wal writes
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes Georgy Kirichenko
@ 2019-04-24 19:19   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:19 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 20:00]:
> Allow to send a journal entry to wal without wait until the writing
> was finished. Two methods were introduced:
>  * async_write method emits an entry to be written, returns 0 if the
>  entry was successfully scheduled;
>  * async_wait method waits until writing was finished and returns a
>  result of journal write.
> 

The approach is looking good, I may have a few nits about the api
(will send them in a second review, separately).


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [tarantool-patches] Re: [PATCH 10/10] Introduce asynchronous txn commit
  2019-04-19 12:44 ` [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit Georgy Kirichenko
@ 2019-04-24 19:20   ` Konstantin Osipov
  0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:20 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 20:00]:
> Allow asynchronous transaction commit. This adds two functions:
>  * txn_async_commit that sends a transaction to a journal
>  * txn_async_wait that waits until the transaction processing was done
> 

I see where you're heading. could you please submit a parallel
applier patch using this stack? As you can see most of the patches
are looking good, thank you for great work!


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2019-04-24 19:21 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
2019-04-19 12:43 ` [tarantool-patches] [PATCH 01/10] Introduce a txn memory region Georgy Kirichenko
2019-04-24 18:20   ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:43 ` [tarantool-patches] [PATCH 02/10] Alloc journal entry on " Georgy Kirichenko
2019-04-24 18:21   ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:43 ` [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction " Georgy Kirichenko
2019-04-24 18:28   ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure Georgy Kirichenko
2019-04-24 19:07   ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
2019-04-24 19:12   ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt Georgy Kirichenko
2019-04-24 19:13   ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure Georgy Kirichenko
2019-04-24 19:16   ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 08/10] Use mempool to alloc wal messages Georgy Kirichenko
2019-04-24 19:18   ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes Georgy Kirichenko
2019-04-24 19:19   ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit Georgy Kirichenko
2019-04-24 19:20   ` [tarantool-patches] " Konstantin Osipov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox