* [tarantool-patches] [PATCH 01/10] Introduce a txn memory region
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
@ 2019-04-19 12:43 ` Georgy Kirichenko
2019-04-24 18:20 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:43 ` [tarantool-patches] [PATCH 02/10] Alloc journal entry on " Georgy Kirichenko
` (8 subsequent siblings)
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:43 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Attach a separate memory region for each txn structure in order to store
all txn internal data until the transaction finished. This patch is a
preparation to detach a txn from a fiber and a fiber gc storage.
Prerequisites: #1254
---
src/box/errcode.h | 2 +-
src/box/sql/vdbe.c | 2 +-
src/box/txn.c | 71 +++++++++++++++++++++++++++++-------
src/box/txn.h | 2 +
test/box/misc.result | 2 +-
test/engine/savepoint.result | 2 +-
test/sql/savepoints.result | 6 +--
7 files changed, 66 insertions(+), 21 deletions(-)
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 3f8cb8e0e..a273826b3 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -166,7 +166,7 @@ struct errcode_record {
/*111 */_(ER_WRONG_SPACE_OPTIONS, "Wrong space options (field %u): %s") \
/*112 */_(ER_UNSUPPORTED_INDEX_FEATURE, "Index '%s' (%s) of space '%s' (%s) does not support %s") \
/*113 */_(ER_VIEW_IS_RO, "View '%s' is read-only") \
- /*114 */_(ER_SAVEPOINT_NO_TRANSACTION, "Can not set a savepoint in absence of active transaction") \
+ /*114 */_(ER_NO_TRANSACTION, "No active transaction") \
/*115 */_(ER_SYSTEM, "%s") \
/*116 */_(ER_LOADING, "Instance bootstrap hasn't finished yet") \
/*117 */_(ER_CONNECTION_TO_SELF, "Connection to self") \
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index ed7bf8870..9fc70980c 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -3017,7 +3017,7 @@ case OP_Savepoint: {
if (psql_txn == NULL) {
assert(!box_txn());
- diag_set(ClientError, ER_SAVEPOINT_NO_TRANSACTION);
+ diag_set(ClientError, ER_NO_TRANSACTION);
rc = SQL_TARANTOOL_ERROR;
goto abort_due_to_error;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index d51bfc67a..815199645 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -37,6 +37,9 @@
double too_long_threshold;
+/* Txn cache. */
+static struct stailq txn_cache = {NULL, &txn_cache.first};
+
static inline void
fiber_set_txn(struct fiber *fiber, struct txn *txn)
{
@@ -44,7 +47,7 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
}
static int
-txn_add_redo(struct txn_stmt *stmt, struct request *request)
+txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
{
stmt->row = request->header;
if (request->header != NULL)
@@ -52,7 +55,7 @@ txn_add_redo(struct txn_stmt *stmt, struct request *request)
/* Create a redo log row for Lua requests */
struct xrow_header *row;
- row = region_alloc_object(&fiber()->gc, struct xrow_header);
+ row = region_alloc_object(&txn->region, struct xrow_header);
if (row == NULL) {
diag_set(OutOfMemory, sizeof(*row),
"region", "struct xrow_header");
@@ -77,7 +80,7 @@ static struct txn_stmt *
txn_stmt_new(struct txn *txn)
{
struct txn_stmt *stmt;
- stmt = region_alloc_object(&fiber()->gc, struct txn_stmt);
+ stmt = region_alloc_object(&txn->region, struct txn_stmt);
if (stmt == NULL) {
diag_set(OutOfMemory, sizeof(*stmt),
"region", "struct txn_stmt");
@@ -134,16 +137,50 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
}
}
+/*
+ * Return a txn from cache or create a new one if cache is empty.
+ */
+inline static struct txn *
+txn_new()
+{
+ if (!stailq_empty(&txn_cache))
+ return stailq_shift_entry(&txn_cache, struct txn, list);
+
+ /* Create a region. */
+ struct region region;
+ region_create(®ion, &cord()->slabc);
+
+ /* Place txn structure on the region. */
+ struct txn *txn = region_alloc_object(®ion, struct txn);
+ if (txn == NULL) {
+ diag_set(OutOfMemory, sizeof(*txn), "region", "struct txn");
+ return NULL;
+ }
+ assert(region_used(®ion) == sizeof(*txn));
+ txn->region = region;
+ return txn;
+}
+
+/*
+ * Free txn memory and return it to a cache.
+ */
+inline static void
+txn_free(struct txn *txn)
+{
+ /* Truncate region up to struct txn size. */
+ region_truncate(&txn->region, sizeof(struct txn));
+ stailq_add(&txn_cache, &txn->list);
+}
+
struct txn *
txn_begin(bool is_autocommit)
{
static int64_t tsn = 0;
assert(! in_txn());
- struct txn *txn = region_alloc_object(&fiber()->gc, struct txn);
- if (txn == NULL) {
- diag_set(OutOfMemory, sizeof(*txn), "region", "struct txn");
+ struct txn *txn = txn_new();
+ if (txn == NULL)
return NULL;
- }
+
/* Initialize members explicitly to save time on memset() */
stailq_create(&txn->stmts);
txn->n_new_rows = 0;
@@ -251,7 +288,7 @@ txn_commit_stmt(struct txn *txn, struct request *request)
* stmt->space can be NULL for IRPOTO_NOP.
*/
if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
- if (txn_add_redo(stmt, request) != 0)
+ if (txn_add_redo(txn, stmt, request) != 0)
goto fail;
assert(stmt->row != NULL);
if (stmt->row->replica_id == 0) {
@@ -393,8 +430,8 @@ txn_commit(struct txn *txn)
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_unref_tuples(stmt);
- TRASH(txn);
fiber_set_txn(fiber(), NULL);
+ txn_free(txn);
return 0;
fail:
txn_rollback();
@@ -433,10 +470,10 @@ txn_rollback()
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_unref_tuples(stmt);
- TRASH(txn);
/** Free volatile txn memory. */
fiber_gc();
fiber_set_txn(fiber(), NULL);
+ txn_free(txn);
}
void
@@ -521,12 +558,18 @@ box_txn_rollback()
void *
box_txn_alloc(size_t size)
{
+ struct txn *txn = in_txn();
+ if (txn == NULL) {
+ /* There are no transaction yet - return an error. */
+ diag_set(ClientError, ER_NO_TRANSACTION);
+ return NULL;
+ }
union natural_align {
void *p;
double lf;
long l;
};
- return region_aligned_alloc(&fiber()->gc, size,
+ return region_aligned_alloc(&txn->region, size,
alignof(union natural_align));
}
@@ -535,11 +578,11 @@ box_txn_savepoint()
{
struct txn *txn = in_txn();
if (txn == NULL) {
- diag_set(ClientError, ER_SAVEPOINT_NO_TRANSACTION);
+ diag_set(ClientError, ER_NO_TRANSACTION);
return NULL;
}
struct txn_savepoint *svp =
- (struct txn_savepoint *) region_alloc_object(&fiber()->gc,
+ (struct txn_savepoint *) region_alloc_object(&txn->region,
struct txn_savepoint);
if (svp == NULL) {
diag_set(OutOfMemory, sizeof(*svp),
@@ -558,7 +601,7 @@ box_txn_rollback_to_savepoint(box_txn_savepoint_t *svp)
{
struct txn *txn = in_txn();
if (txn == NULL) {
- diag_set(ClientError, ER_SAVEPOINT_NO_TRANSACTION);
+ diag_set(ClientError, ER_NO_TRANSACTION);
return -1;
}
struct txn_stmt *stmt = svp->stmt == NULL ? NULL :
diff --git a/src/box/txn.h b/src/box/txn.h
index 747b38db3..fe8a299f2 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -132,6 +132,8 @@ struct autoinc_id_entry {
};
struct txn {
+ struct stailq_entry list;
+ struct region region;
/**
* A sequentially growing transaction id, assigned when
* a transaction is initiated. Used to identify
diff --git a/test/box/misc.result b/test/box/misc.result
index a1f7a0990..d7d76d87e 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -443,7 +443,7 @@ t;
111: box.error.WRONG_SPACE_OPTIONS
112: box.error.UNSUPPORTED_INDEX_FEATURE
113: box.error.VIEW_IS_RO
- 114: box.error.SAVEPOINT_NO_TRANSACTION
+ 114: box.error.NO_TRANSACTION
115: box.error.SYSTEM
116: box.error.LOADING
117: box.error.CONNECTION_TO_SELF
diff --git a/test/engine/savepoint.result b/test/engine/savepoint.result
index b3a918de0..86a2d0be2 100644
--- a/test/engine/savepoint.result
+++ b/test/engine/savepoint.result
@@ -14,7 +14,7 @@ s1 = nil
...
s1 = box.savepoint()
---
-- error: Can not set a savepoint in absence of active transaction
+- error: No active transaction
...
box.rollback_to_savepoint(s1)
---
diff --git a/test/sql/savepoints.result b/test/sql/savepoints.result
index 2f943bd9b..bb4a296fa 100644
--- a/test/sql/savepoints.result
+++ b/test/sql/savepoints.result
@@ -14,15 +14,15 @@ box.execute('pragma sql_default_engine=\''..engine..'\'')
--
box.execute('SAVEPOINT t1;');
---
-- error: Can not set a savepoint in absence of active transaction
+- error: No active transaction
...
box.execute('RELEASE SAVEPOINT t1;');
---
-- error: Can not set a savepoint in absence of active transaction
+- error: No active transaction
...
box.execute('ROLLBACK TO SAVEPOINT t1;');
---
-- error: Can not set a savepoint in absence of active transaction
+- error: No active transaction
...
box.begin() box.execute('SAVEPOINT t1;') box.execute('RELEASE SAVEPOINT t1;') box.commit();
---
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH 01/10] Introduce a txn memory region
2019-04-19 12:43 ` [tarantool-patches] [PATCH 01/10] Introduce a txn memory region Georgy Kirichenko
@ 2019-04-24 18:20 ` Konstantin Osipov
0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 18:20 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 19:59]:
> Attach a separate memory region for each txn structure in order to store
> all txn internal data until the transaction finished. This patch is a
> preparation to detach a txn from a fiber and a fiber gc storage.
The patch is OK to push.
See two minor comments below.
> struct txn {
> + struct stailq_entry list;
Please rename to in_txn_cache
> + struct region region;
Please provide meaningful comments for both members: e.g. say that
we can have more than one transaction per an active fiber in case
of parallel applier for replication changes plus we want to detach
a transaction from a fiber to allow interactive transactions.
In theory this patch increases the amount of memory locked to the
runtime pool, but since this is O(1) right now I would ignore
this.
As a follow up patch please consider optimizing txn_new() for
cases when txn is taken from the cache: some of the members are
cleared by txn_commit()/txn_rollback() so do not need to be set
again. This is a very hot path!
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH 02/10] Alloc journal entry on a txn memory region
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
2019-04-19 12:43 ` [tarantool-patches] [PATCH 01/10] Introduce a txn memory region Georgy Kirichenko
@ 2019-04-19 12:43 ` Georgy Kirichenko
2019-04-24 18:21 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:43 ` [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction " Georgy Kirichenko
` (7 subsequent siblings)
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:43 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Use txn memory to allocate a journal entry structure.
This relaxes a dependency between a journal entry and a fiber.
Prerequisites: #1254
---
src/box/journal.c | 4 ++--
src/box/journal.h | 4 +++-
src/box/txn.c | 3 ++-
src/box/vy_log.c | 2 +-
4 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/src/box/journal.c b/src/box/journal.c
index 7498ba192..fe13fb6ee 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -53,14 +53,14 @@ static struct journal dummy_journal = {
struct journal *current_journal = &dummy_journal;
struct journal_entry *
-journal_entry_new(size_t n_rows)
+journal_entry_new(size_t n_rows, struct region *region)
{
struct journal_entry *entry;
size_t size = (sizeof(struct journal_entry) +
sizeof(entry->rows[0]) * n_rows);
- entry = region_aligned_alloc(&fiber()->gc, size,
+ entry = region_aligned_alloc(region, size,
alignof(struct journal_entry));
if (entry == NULL) {
diag_set(OutOfMemory, size, "region", "struct journal_entry");
diff --git a/src/box/journal.h b/src/box/journal.h
index e52316888..8ac32ee5e 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -72,13 +72,15 @@ struct journal_entry {
struct xrow_header *rows[];
};
+struct region;
+
/**
* Create a new journal entry.
*
* @return NULL if out of memory, fiber diagnostics area is set
*/
struct journal_entry *
-journal_entry_new(size_t n_rows);
+journal_entry_new(size_t n_rows, struct region *region);
/**
* An API for an abstract journal for all transactions of this
diff --git a/src/box/txn.c b/src/box/txn.c
index 815199645..ed5e54bb6 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -332,7 +332,8 @@ txn_write_to_wal(struct txn *txn)
assert(txn->n_new_rows + txn->n_applier_rows > 0);
struct journal_entry *req = journal_entry_new(txn->n_new_rows +
- txn->n_applier_rows);
+ txn->n_applier_rows,
+ &txn->region);
if (req == NULL)
return -1;
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index be97cdbbe..85059588e 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -772,7 +772,7 @@ vy_log_flush(void)
tx_size++;
size_t used = region_used(&fiber()->gc);
- struct journal_entry *entry = journal_entry_new(tx_size);
+ struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
if (entry == NULL)
goto err;
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction memory region
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
2019-04-19 12:43 ` [tarantool-patches] [PATCH 01/10] Introduce a txn memory region Georgy Kirichenko
2019-04-19 12:43 ` [tarantool-patches] [PATCH 02/10] Alloc journal entry on " Georgy Kirichenko
@ 2019-04-19 12:43 ` Georgy Kirichenko
2019-04-24 18:28 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure Georgy Kirichenko
` (6 subsequent siblings)
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:43 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Encode all statements to be written out to wal onto a transaction
memory region. This relaxes a relation between transaction and fiber
state and required for an autonomous transaction feature.
Prerequisites: #1254
---
src/box/request.c | 2 +-
src/box/txn.c | 2 +-
src/box/vy_log.c | 2 +-
src/box/vy_stmt.c | 6 ++++--
src/box/xrow.c | 21 +++++++++++++++------
src/box/xrow.h | 12 +++++++++---
6 files changed, 31 insertions(+), 14 deletions(-)
diff --git a/src/box/request.c b/src/box/request.c
index 9c684af73..9edd1f8b1 100644
--- a/src/box/request.c
+++ b/src/box/request.c
@@ -61,7 +61,7 @@ request_update_header(struct request *request, struct xrow_header *row)
if (row == NULL)
return 0;
row->type = request->type;
- row->bodycnt = xrow_encode_dml(request, row->body);
+ row->bodycnt = xrow_encode_dml(request, &fiber()->gc, false, row->body);
if (row->bodycnt < 0)
return -1;
request->header = row;
diff --git a/src/box/txn.c b/src/box/txn.c
index ed5e54bb6..ffada5984 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -68,7 +68,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
row->lsn = 0;
row->sync = 0;
row->tm = 0;
- row->bodycnt = xrow_encode_dml(request, row->body);
+ row->bodycnt = xrow_encode_dml(request, &txn->region, true, row->body);
if (row->bodycnt < 0)
return -1;
stmt->row = row;
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 85059588e..76b21c822 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -520,7 +520,7 @@ vy_log_record_encode(const struct vy_log_record *record,
req.tuple_end = pos;
memset(row, 0, sizeof(*row));
row->type = req.type;
- row->bodycnt = xrow_encode_dml(&req, row->body);
+ row->bodycnt = xrow_encode_dml(&req, &fiber()->gc, false, row->body);
return 0;
}
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index e1cdd293d..3bc22965c 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -681,7 +681,8 @@ vy_stmt_encode_primary(struct tuple *value, struct key_def *key_def,
}
if (vy_stmt_meta_encode(value, &request, true) != 0)
return -1;
- xrow->bodycnt = xrow_encode_dml(&request, xrow->body);
+ xrow->bodycnt = xrow_encode_dml(&request, &fiber()->gc, false,
+ xrow->body);
if (xrow->bodycnt < 0)
return -1;
return 0;
@@ -715,7 +716,8 @@ vy_stmt_encode_secondary(struct tuple *value, struct key_def *cmp_def,
}
if (vy_stmt_meta_encode(value, &request, false) != 0)
return -1;
- xrow->bodycnt = xrow_encode_dml(&request, xrow->body);
+ xrow->bodycnt = xrow_encode_dml(&request, &fiber()->gc, false,
+ xrow->body);
if (xrow->bodycnt < 0)
return -1;
else
diff --git a/src/box/xrow.c b/src/box/xrow.c
index aaed84e38..1fc60f6d8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -746,15 +746,19 @@ request_str(const struct request *request)
}
int
-xrow_encode_dml(const struct request *request, struct iovec *iov)
+xrow_encode_dml(const struct request *request, struct region *region,
+ bool copy_tuple, struct iovec *iov)
{
int iovcnt = 1;
const int MAP_LEN_MAX = 40;
uint32_t key_len = request->key_end - request->key;
uint32_t ops_len = request->ops_end - request->ops;
uint32_t tuple_meta_len = request->tuple_meta_end - request->tuple_meta;
- uint32_t len = MAP_LEN_MAX + key_len + ops_len + tuple_meta_len;
- char *begin = (char *) region_alloc(&fiber()->gc, len);
+ uint32_t tuple_len = copy_tuple ?
+ (request->tuple_end - request->tuple) : 0;
+ uint32_t len = MAP_LEN_MAX + key_len + ops_len + tuple_meta_len +
+ tuple_len;
+ char *begin = (char *) region_alloc(region, len);
if (begin == NULL) {
diag_set(OutOfMemory, len, "region_alloc", "begin");
return -1;
@@ -796,9 +800,14 @@ xrow_encode_dml(const struct request *request, struct iovec *iov)
}
if (request->tuple) {
pos = mp_encode_uint(pos, IPROTO_TUPLE);
- iov[iovcnt].iov_base = (void *) request->tuple;
- iov[iovcnt].iov_len = (request->tuple_end - request->tuple);
- iovcnt++;
+ if (copy_tuple) {
+ memcpy(pos, request->tuple, tuple_len);
+ pos += tuple_len;
+ } else {
+ iov[iovcnt].iov_base = (void *) request->tuple;
+ iov[iovcnt].iov_len = (request->tuple_end - request->tuple);
+ iovcnt++;
+ }
map_size++;
}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3a8cba191..c1d737efd 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -54,6 +54,8 @@ enum {
IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
};
+struct region;
+
struct xrow_header {
/* (!) Please update txn_add_redo() after changing members */
@@ -195,12 +197,15 @@ xrow_decode_dml(struct xrow_header *xrow, struct request *request,
/**
* Encode the request fields to iovec using region_alloc().
* @param request request to encode
+ * @param region region to encode
+ * @param copy_tuple if true then tuple is going to be copied to the region
* @param iov[out] iovec to fill
* @retval -1 on error, see diag
* @retval > 0 the number of iovecs used
*/
int
-xrow_encode_dml(const struct request *request, struct iovec *iov);
+xrow_encode_dml(const struct request *request, struct region *region,
+ bool copy_tuple, struct iovec *iov);
/**
* CALL/EVAL request.
@@ -713,9 +718,10 @@ xrow_decode_dml_xc(struct xrow_header *row, struct request *request,
/** @copydoc xrow_encode_dml. */
static inline int
-xrow_encode_dml_xc(const struct request *request, struct iovec *iov)
+xrow_encode_dml_xc(const struct request *request, struct region *region,
+ bool copy_tuple, struct iovec *iov)
{
- int iovcnt = xrow_encode_dml(request, iov);
+ int iovcnt = xrow_encode_dml(request, region, copy_tuple, iov);
if (iovcnt < 0)
diag_raise();
return iovcnt;
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH 03/10] Encode a dml statement to a transaction memory region
2019-04-19 12:43 ` [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction " Georgy Kirichenko
@ 2019-04-24 18:28 ` Konstantin Osipov
0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 18:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 19:59]:
> Prerequisites: #1254
> ---
> src/box/request.c | 2 +-
> src/box/txn.c | 2 +-
> src/box/vy_log.c | 2 +-
> src/box/vy_stmt.c | 6 ++++--
> src/box/xrow.c | 21 +++++++++++++++------
> src/box/xrow.h | 12 +++++++++---
> 6 files changed, 31 insertions(+), 14 deletions(-)
>
> diff --git a/src/box/request.c b/src/box/request.c
> index 9c684af73..9edd1f8b1 100644
> --- a/src/box/request.c
> +++ b/src/box/request.c
> @@ -61,7 +61,7 @@ request_update_header(struct request *request, struct xrow_header *row)
> if (row == NULL)
> return 0;
> row->type = request->type;
> - row->bodycnt = xrow_encode_dml(request, row->body);
> + row->bodycnt = xrow_encode_dml(request, &fiber()->gc, false, row->body);
I don't understand the business with copy_tuple. Why do you need
it? Why not always copy the tuple? Why not have a separate wrapper
which would copy the body, e.g. xrow_encode_dml_and_copy_body()?
request_update_header() should be using txn memory, I don't see
why it uses region memory in your patch. I haven't seen the
subsequent patches, I hope it is addressed there. If not, please
do.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
` (2 preceding siblings ...)
2019-04-19 12:43 ` [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction " Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
2019-04-24 19:07 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
` (5 subsequent siblings)
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Move transaction auto start and auto commit behavior to the box level.
From now a transaction won't start and commit automatically without
txn_begin/txn_commit invocations. This is a part of a bigger transaction
refactoring in order to implement detachable transactions and a parallel
applier.
Prerequisites: #1254
---
src/box/applier.cc | 27 ++++++++++++---
src/box/box.cc | 76 +++++++++++++++++++++++++++++++-----------
src/box/memtx_engine.c | 10 ++++--
src/box/txn.c | 25 ++++----------
src/box/txn.h | 7 +---
src/box/vinyl.c | 12 +++----
6 files changed, 99 insertions(+), 58 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 373e1feb9..3b74e0f54 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -172,11 +172,22 @@ applier_writer_f(va_list ap)
static int
apply_initial_join_row(struct xrow_header *row)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
struct request request;
xrow_decode_dml(row, &request, dml_request_key_map(row->type));
- struct space *space = space_cache_find_xc(request.space_id);
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- return space_apply_initial_join_row(space, &request);
+ if (space_apply_initial_join_row(space, &request)) {
+ txn_rollback();
+ return -1;
+ }
+ int rc = txn_commit(txn);
+ fiber_gc();
+ return rc;
}
/**
@@ -403,8 +414,16 @@ applier_join(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
- if (apply_row(&row) != 0)
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
+ if (apply_row(&row) != 0) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
+ fiber_gc();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
@@ -555,7 +574,7 @@ applier_apply_tx(struct stailq *rows)
* conflict safely access failed xrow object and allocate
* IPROTO_NOP on gc.
*/
- struct txn *txn = txn_begin(false);
+ struct txn *txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL)
diag_raise();
diff --git a/src/box/box.cc b/src/box/box.cc
index 7828f575b..54210474f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -169,34 +169,54 @@ int
box_process_rw(struct request *request, struct space *space,
struct tuple **result)
{
+ struct tuple *tuple = NULL;
+ struct txn *txn = in_txn();
+ bool autocommit = txn == NULL;
+ if (txn == NULL && (txn = txn_begin()) == NULL)
+ return -1;
assert(iproto_type_is_dml(request->type));
rmean_collect(rmean_box, request->type, 1);
if (access_check_space(space, PRIV_W) != 0)
- return -1;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
- return -1;
- struct tuple *tuple;
+ goto fail;
+ if (txn_begin_stmt(space) == NULL)
+ goto fail;
if (space_execute_dml(space, txn, request, &tuple) != 0) {
txn_rollback_stmt();
- return -1;
+ goto fail;
}
- if (result == NULL)
- return txn_commit_stmt(txn, request);
- *result = tuple;
- if (tuple == NULL)
- return txn_commit_stmt(txn, request);
/*
* Pin the tuple locally before the commit,
* otherwise it may go away during yield in
* when WAL is written in autocommit mode.
*/
- tuple_ref(tuple);
- int rc = txn_commit_stmt(txn, request);
- if (rc == 0)
+ if (tuple != NULL)
+ tuple_ref(tuple);
+
+ if (result != NULL)
+ *result = tuple;
+
+ if (txn_commit_stmt(txn, request) != 0)
+ goto fail;
+ if (autocommit) {
+ if (txn_commit(txn) != 0)
+ goto fail;
+ fiber_gc();
+ }
+
+ if (tuple != NULL && result != NULL) {
tuple_bless(tuple);
- tuple_unref(tuple);
- return rc;
+ }
+
+ if (tuple != NULL)
+ tuple_unref(tuple);
+ return 0;
+
+fail:
+ if (autocommit)
+ txn_rollback();
+ if (tuple != NULL)
+ tuple_unref(tuple);
+ return -1;
}
void
@@ -299,8 +319,12 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
- if (box_process_rw(&request, space, NULL) != 0) {
+ struct txn *txn = txn_begin();
+ if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
+ txn_commit(txn) != 0) {
say_error("error applying row: %s", request_str(&request));
+ if (txn != NULL)
+ txn_rollback();
diag_raise();
}
}
@@ -1313,8 +1337,15 @@ box_sequence_reset(uint32_t seq_id)
static inline void
box_register_replica(uint32_t id, const struct tt_uuid *uuid)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
- (unsigned) id, tt_uuid_str(uuid)) != 0)
+ (unsigned) id, tt_uuid_str(uuid)) != 0) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
assert(replica_by_uuid(uuid)->id == id);
}
@@ -1636,9 +1667,16 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
uu = *replicaset_uuid;
else
tt_uuid_create(&uu);
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
/* Save replica set UUID in _schema */
if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
- tt_uuid_str(&uu)))
+ tt_uuid_str(&uu))) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
}
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 4d99910cb..afa1739c6 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -272,16 +272,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
return -1;
}
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- if (space_apply_initial_join_row(space, &request) != 0)
+ if (space_apply_initial_join_row(space, &request) != 0) {
+ txn_rollback();
return -1;
+ }
+ int rc = txn_commit(txn);
/*
* Don't let gc pool grow too much. Yet to
* it before reading the next row, to make
* sure it's not freed along here.
*/
fiber_gc();
- return 0;
+ return rc;
}
/** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/txn.c b/src/box/txn.c
index ffada5984..804962767 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -173,7 +173,7 @@ txn_free(struct txn *txn)
}
struct txn *
-txn_begin(bool is_autocommit)
+txn_begin()
{
static int64_t tsn = 0;
assert(! in_txn());
@@ -186,7 +186,6 @@ txn_begin(bool is_autocommit)
txn->n_new_rows = 0;
txn->n_local_rows = 0;
txn->n_applier_rows = 0;
- txn->is_autocommit = is_autocommit;
txn->has_triggers = false;
txn->is_aborted = false;
txn->in_sub_stmt = 0;
@@ -224,19 +223,15 @@ txn_begin_stmt(struct space *space)
{
struct txn *txn = in_txn();
if (txn == NULL) {
- txn = txn_begin(true);
- if (txn == NULL)
- return NULL;
+ diag_set(ClientError, ER_NO_TRANSACTION);
+ return NULL;
} else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
diag_set(ClientError, ER_SUB_STMT_MAX);
return NULL;
}
struct txn_stmt *stmt = txn_stmt_new(txn);
- if (stmt == NULL) {
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- txn_rollback();
+ if (stmt == NULL)
return NULL;
- }
if (space == NULL)
return txn;
@@ -271,8 +266,7 @@ txn_is_distributed(struct txn *txn)
}
/**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * End a statement.
*/
int
txn_commit_stmt(struct txn *txn, struct request *request)
@@ -315,11 +309,6 @@ txn_commit_stmt(struct txn *txn, struct request *request)
goto fail;
}
--txn->in_sub_stmt;
- if (txn->is_autocommit && txn->in_sub_stmt == 0) {
- int rc = txn_commit(txn);
- fiber_gc();
- return rc;
- }
return 0;
fail:
txn_rollback_stmt();
@@ -446,8 +435,6 @@ txn_rollback_stmt()
if (txn == NULL || txn->in_sub_stmt == 0)
return;
txn->in_sub_stmt--;
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- return txn_rollback();
txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]);
}
@@ -518,7 +505,7 @@ box_txn_begin()
diag_set(ClientError, ER_ACTIVE_TRANSACTION);
return -1;
}
- if (txn_begin(false) == NULL)
+ if (txn_begin() == NULL)
return -1;
return 0;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index fe8a299f2..7f999b8e9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -154,11 +154,6 @@ struct txn {
* already assigned LSN.
*/
int n_applier_rows;
- /**
- * True if this transaction is running in autocommit mode
- * (statement end causes an automatic transaction commit).
- */
- bool is_autocommit;
/**
* True if the transaction was aborted so should be
* rolled back at commit.
@@ -206,7 +201,7 @@ in_txn()
* @pre no transaction is active
*/
struct txn *
-txn_begin(bool is_autocommit);
+txn_begin();
/**
* Commit a transaction.
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 41918beba..fe258e784 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2399,10 +2399,8 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn)
txn->engine_tx = vy_tx_begin(env->xm);
if (txn->engine_tx == NULL)
return -1;
- if (!txn->is_autocommit) {
- trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
- trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
- }
+ trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
+ trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
return 0;
}
@@ -2472,8 +2470,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
vy_regulator_check_dump_watermark(&env->regulator);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
+ trigger_clear(&txn->fiber_on_stop);
}
static void
@@ -2487,8 +2484,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
vy_tx_rollback(tx);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
+ trigger_clear(&txn->fiber_on_stop);
}
static int
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH 04/10] Get rid of autocommit from a txn structure
2019-04-19 12:44 ` [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-04-24 19:07 ` Konstantin Osipov
0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:07 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 19:59]:
> box_process_rw(struct request *request, struct space *space,
> struct tuple **result)
> {
> + struct tuple *tuple = NULL;
> + struct txn *txn = in_txn();
> + bool autocommit = txn == NULL;
> + if (txn == NULL && (txn = txn_begin()) == NULL)
nit: why not use if (autocommit && (txn == txn_begin()) == NULL)
instead
> + return -1;
> assert(iproto_type_is_dml(request->type));
> rmean_collect(rmean_box, request->type, 1);
> if (access_check_space(space, PRIV_W) != 0)
> - return -1;
> - struct txn *txn = txn_begin_stmt(space);
> - if (txn == NULL)
> - return -1;
> - struct tuple *tuple;
> + goto fail;
> + if (txn_begin_stmt(space) == NULL)
> + goto fail;
I guess txn_begin_stmt can as well receive txn explicitly now,
since we always fetch fiber->txn before calling txn_begin_stmt().
This will save us one fiber key fetch.
> if (space_execute_dml(space, txn, request, &tuple) != 0) {
> txn_rollback_stmt();
> - return -1;
> + goto fail;
> }
> - if (result == NULL)
> - return txn_commit_stmt(txn, request);
> - *result = tuple;
> - if (tuple == NULL)
> - return txn_commit_stmt(txn, request);
> /*
> * Pin the tuple locally before the commit,
> * otherwise it may go away during yield in
> * when WAL is written in autocommit mode.
> */
> - tuple_ref(tuple);
> - int rc = txn_commit_stmt(txn, request);
> - if (rc == 0)
> + if (tuple != NULL)
> + tuple_ref(tuple);
> +
> + if (result != NULL)
> + *result = tuple;
> +
> + if (txn_commit_stmt(txn, request) != 0)
> + goto fail;
> + if (autocommit) {
> + if (txn_commit(txn) != 0)
> + goto fail;
> + fiber_gc();
> + }
> +
> + if (tuple != NULL && result != NULL) {
> tuple_bless(tuple);
> - tuple_unref(tuple);
The code here now looks quite messy. I think it would be easier to
read if we split it between if (autocommit) ... else ... branches:
if (tuple == NULL) {
return is_autocommit ? txn_commit_stmt() : txn_commit();
}
tuple_ref(tuple);
if (txn_commit_stmt(txn, request))
goto fail;
if (is_autocommit)
if (txn_commit())
goto fail;
fiber_gc();
}
tuple_bless(tuple);
if (result)
*result = tuple;
tuple_unref(tuple);
return 0;
The patch is generally looking good, please fix the above nits and
it will be good to go.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
` (3 preceding siblings ...)
2019-04-19 12:44 ` [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
2019-04-24 19:12 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt Georgy Kirichenko
` (4 subsequent siblings)
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Don't touch a fiber gc storage on a transaction rollback. This relaxes
dependencies between fiber and transaction life cycles.
Prerequisites: #1254
---
src/box/applier.cc | 9 ++++++---
src/box/box.cc | 24 +++++++++++++++++-------
src/box/call.c | 16 ++++++++++++----
src/box/memtx_engine.c | 3 ++-
src/box/txn.c | 22 +++++++++++-----------
src/box/txn.h | 7 +++++--
src/box/vy_scheduler.c | 6 ++++--
7 files changed, 57 insertions(+), 30 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 3b74e0f54..42b6efc4d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -182,7 +182,8 @@ apply_initial_join_row(struct xrow_header *row)
return -1;
/* no access checks here - applier always works with admin privs */
if (space_apply_initial_join_row(space, &request)) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
int rc = txn_commit(txn);
@@ -418,7 +419,8 @@ applier_join(struct applier *applier)
if (txn == NULL)
diag_raise();
if (apply_row(&row) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
diag_raise();
}
if (txn_commit(txn) != 0)
@@ -621,7 +623,8 @@ applier_apply_tx(struct stailq *rows)
return txn_commit(txn);
rollback:
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 54210474f..835a00c95 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -197,9 +197,12 @@ box_process_rw(struct request *request, struct space *space,
if (txn_commit_stmt(txn, request) != 0)
goto fail;
+
if (autocommit) {
- if (txn_commit(txn) != 0)
+ if (txn_commit(txn) != 0) {
+ txn = NULL;
goto fail;
+ }
fiber_gc();
}
@@ -212,8 +215,11 @@ box_process_rw(struct request *request, struct space *space,
return 0;
fail:
- if (autocommit)
- txn_rollback();
+ if (autocommit) {
+ if (txn != NULL)
+ txn_rollback(txn);
+ fiber_gc();
+ }
if (tuple != NULL)
tuple_unref(tuple);
return -1;
@@ -323,8 +329,10 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
txn_commit(txn) != 0) {
say_error("error applying row: %s", request_str(&request));
- if (txn != NULL)
- txn_rollback();
+ if (txn != NULL) {
+ txn_rollback(txn);
+ fiber_gc();
+ }
diag_raise();
}
}
@@ -1342,7 +1350,8 @@ box_register_replica(uint32_t id, const struct tt_uuid *uuid)
diag_raise();
if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
(unsigned) id, tt_uuid_str(uuid)) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
diag_raise();
}
if (txn_commit(txn) != 0)
@@ -1673,7 +1682,8 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
/* Save replica set UUID in _schema */
if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
tt_uuid_str(&uu))) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
diag_raise();
}
if (txn_commit(txn) != 0)
diff --git a/src/box/call.c b/src/box/call.c
index b9750c5f3..4b5f155df 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -208,13 +208,17 @@ box_process_call(struct call_request *request, struct port *port)
fiber_set_user(fiber(), orig_credentials);
if (rc != 0) {
- txn_rollback();
+ if (in_txn() != NULL)
+ txn_rollback(in_txn());
+ fiber_gc();
return -1;
}
if (in_txn()) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- txn_rollback();
+ if (in_txn() != NULL)
+ txn_rollback(in_txn());
+ fiber_gc();
return -1;
}
@@ -229,13 +233,17 @@ box_process_eval(struct call_request *request, struct port *port)
if (access_check_universe(PRIV_X) != 0)
return -1;
if (box_lua_eval(request, port) != 0) {
- txn_rollback();
+ if (in_txn() != NULL)
+ txn_rollback(in_txn());
+ fiber_gc();
return -1;
}
if (in_txn()) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- txn_rollback();
+ if (in_txn() != NULL)
+ txn_rollback(in_txn());
+ fiber_gc();
return -1;
}
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index afa1739c6..cf2205c68 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -277,7 +277,8 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
return -1;
/* no access checks here - applier always works with admin privs */
if (space_apply_initial_join_row(space, &request) != 0) {
- txn_rollback();
+ txn_rollback(txn);
+ fiber_gc();
return -1;
}
int rc = txn_commit(txn);
diff --git a/src/box/txn.c b/src/box/txn.c
index 804962767..fdfffa144 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -347,7 +347,7 @@ txn_write_to_wal(struct txn *txn)
if (res < 0) {
/* Cascading rollback. */
- txn_rollback(); /* Perform our part of cascading rollback. */
+ txn_rollback(txn); /* Perform our part of cascading rollback. */
/*
* Move fiber to end of event loop to avoid
* execution of any new requests before all
@@ -396,7 +396,7 @@ txn_commit(struct txn *txn)
if (txn->n_new_rows + txn->n_applier_rows > 0) {
txn->signature = txn_write_to_wal(txn);
if (txn->signature < 0)
- goto fail;
+ return -1;
}
/*
* The transaction is in the binary log. No action below
@@ -424,7 +424,7 @@ txn_commit(struct txn *txn)
txn_free(txn);
return 0;
fail:
- txn_rollback();
+ txn_rollback(txn);
return -1;
}
@@ -439,11 +439,9 @@ txn_rollback_stmt()
}
void
-txn_rollback()
+txn_rollback(struct txn *txn)
{
- struct txn *txn = in_txn();
- if (txn == NULL)
- return;
+ assert(txn == in_txn());
/* Rollback triggers must not throw. */
if (txn->has_triggers &&
trigger_run(&txn->on_rollback, txn) != 0) {
@@ -458,8 +456,6 @@ txn_rollback()
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_unref_tuples(stmt);
- /** Free volatile txn memory. */
- fiber_gc();
fiber_set_txn(fiber(), NULL);
txn_free(txn);
}
@@ -535,11 +531,14 @@ int
box_txn_rollback()
{
struct txn *txn = in_txn();
+ if (txn == NULL)
+ return 0;
if (txn && txn->in_sub_stmt) {
diag_set(ClientError, ER_ROLLBACK_IN_SUB_STMT);
return -1;
}
- txn_rollback(); /* doesn't throw */
+ txn_rollback(txn); /* doesn't throw */
+ fiber_gc();
return 0;
}
@@ -617,6 +616,7 @@ txn_on_stop(struct trigger *trigger, void *event)
{
(void) trigger;
(void) event;
- txn_rollback(); /* doesn't yield or fail */
+ txn_rollback(in_txn()); /* doesn't yield or fail */
+
}
diff --git a/src/box/txn.h b/src/box/txn.h
index 7f999b8e9..ae2d7b9a5 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -213,9 +213,12 @@ txn_begin();
int
txn_commit(struct txn *txn);
-/** Rollback a transaction, if any. */
+/**
+ * Rollback a transaction.
+ * @pre txn == in_txn()
+ */
void
-txn_rollback();
+txn_rollback(struct txn *txn);
/**
* Roll back the transaction but keep the object around.
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index fabb4bb48..1f6b30f4a 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -889,8 +889,10 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
for (int i = 0; i < batch->count; i++) {
if (vy_deferred_delete_process_one(deferred_delete_space,
pk->space_id, pk->mem_format,
- &batch->stmt[i]) != 0)
+ &batch->stmt[i]) != 0) {
+ txn_rollback(txn);
goto fail;
+ }
}
if (txn_commit(txn) != 0)
@@ -900,7 +902,7 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
fail:
batch->is_failed = true;
diag_move(diag_get(), &batch->diag);
- txn_rollback();
+ fiber_gc();
}
/**
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH 05/10] Get rid of fiber_gc from txn_rollback
2019-04-19 12:44 ` [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-04-24 19:12 ` Konstantin Osipov
0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:12 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 19:59]:
The patch is LGTM, please see one comment below.
> index fabb4bb48..1f6b30f4a 100644
> --- a/src/box/vy_scheduler.c
> +++ b/src/box/vy_scheduler.c
> @@ -889,8 +889,10 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
> for (int i = 0; i < batch->count; i++) {
> if (vy_deferred_delete_process_one(deferred_delete_space,
> pk->space_id, pk->mem_format,
> - &batch->stmt[i]) != 0)
> + &batch->stmt[i]) != 0) {
> + txn_rollback(txn);
> goto fail;
> + }
> }
>
I would add a new label, fail_rollback, this would be more with
vinyl style.
> if (txn_commit(txn) != 0)
> @@ -900,7 +902,7 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
> fail:
> batch->is_failed = true;
> diag_move(diag_get(), &batch->diag);
> - txn_rollback();
> + fiber_gc();
> }
>
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
` (4 preceding siblings ...)
2019-04-19 12:44 ` [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
2019-04-24 19:13 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure Georgy Kirichenko
` (3 subsequent siblings)
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Pass a txn structure to tnx_begin_stmt and txn_rollback_stmt functions.
Prerequisites: #1254
---
src/box/applier.cc | 4 ++--
src/box/box.cc | 8 ++++----
src/box/index.cc | 10 +++++-----
src/box/journal.c | 1 +
src/box/journal.h | 16 +++++++++++++++
src/box/memtx_space.c | 8 ++++----
src/box/sql.c | 2 +-
src/box/txn.c | 45 ++++++++++++++++++++++++------------------
src/box/txn.h | 9 ++++-----
src/box/vy_scheduler.c | 6 +++---
src/box/wal.c | 5 ++++-
11 files changed, 70 insertions(+), 44 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 42b6efc4d..43b1a84bc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -201,8 +201,8 @@ static int
process_nop(struct request *request)
{
assert(request->type == IPROTO_NOP);
- struct txn *txn = txn_begin_stmt(NULL);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, NULL) == NULL)
return -1;
return txn_commit_stmt(txn, request);
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 835a00c95..46cd444fd 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -178,10 +178,10 @@ box_process_rw(struct request *request, struct space *space,
rmean_collect(rmean_box, request->type, 1);
if (access_check_space(space, PRIV_W) != 0)
goto fail;
- if (txn_begin_stmt(space) == NULL)
+ if (txn_begin_stmt(txn, space) == NULL)
goto fail;
if (space_execute_dml(space, txn, request, &tuple) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
goto fail;
}
/*
@@ -1087,7 +1087,7 @@ box_select(uint32_t space_id, uint32_t index_id,
struct iterator *it = index_create_iterator(index, type,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -1112,7 +1112,7 @@ box_select(uint32_t space_id, uint32_t index_id,
if (rc != 0) {
port_destroy(port);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
diff --git a/src/box/index.cc b/src/box/index.cc
index 2817d076d..baee0c1cb 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -239,7 +239,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_get(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -273,7 +273,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_min(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -305,7 +305,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_max(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -339,7 +339,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
return -1;
ssize_t count = index_count(index, itype, key, part_count);
if (count < 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -376,7 +376,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
struct iterator *it = index_create_iterator(index, itype,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
txn_commit_ro_stmt(txn);
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..5cffc7452 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -70,6 +70,7 @@ journal_entry_new(size_t n_rows, struct region *region)
entry->n_rows = n_rows;
entry->res = -1;
entry->fiber = fiber();
+ rlist_create(&entry->on_error);
return entry;
}
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..94be0b007 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -32,6 +32,7 @@
*/
#include <stdint.h>
#include <stdbool.h>
+#include "trigger.h"
#include "salad/stailq.h"
#if defined(__cplusplus)
@@ -58,6 +59,12 @@ struct journal_entry {
* The fiber issuing the request.
*/
struct fiber *fiber;
+ /**
+ * A trigger list to call if write failed. Triggers are going to be
+ * fired before any other processing and are a good place to implement
+ * any rollback non-yielding behavior.
+ */
+ struct rlist on_error;
/**
* Approximate size of this request when encoded.
*/
@@ -82,6 +89,15 @@ struct region;
struct journal_entry *
journal_entry_new(size_t n_rows, struct region *region);
+/**
+ * Add an on_error trigger to a journal entry.
+ */
+static inline void
+journal_entry_on_error(struct journal_entry *entry, struct trigger *trigger)
+{
+ trigger_add(&entry->on_error, trigger);
+}
+
/**
* An API for an abstract journal for all transactions of this
* instance, as well as for multiple instances in case of
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index a28204d30..f2b28fb71 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
return -1;
}
request->header->replica_id = 0;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ struct txn_stmt *stmt = txn_begin_stmt(txn, space);
+ if (stmt == NULL)
return -1;
- struct txn_stmt *stmt = txn_current_stmt(txn);
stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
request->tuple_end);
if (stmt->new_tuple == NULL)
@@ -326,7 +326,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
rollback:
say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
diff --git a/src/box/sql.c b/src/box/sql.c
index 1fb93e106..0051b93c0 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -911,7 +911,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
part_count);
if (it == NULL) {
if (txn != NULL)
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
pCur->eState = CURSOR_INVALID;
return SQL_TARANTOOL_ITERATOR_FAIL;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index fdfffa144..8f5b66480 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -218,14 +218,10 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
return 0;
}
-struct txn *
-txn_begin_stmt(struct space *space)
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space)
{
- struct txn *txn = in_txn();
- if (txn == NULL) {
- diag_set(ClientError, ER_NO_TRANSACTION);
- return NULL;
- } else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+ if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
diag_set(ClientError, ER_SUB_STMT_MAX);
return NULL;
}
@@ -233,7 +229,7 @@ txn_begin_stmt(struct space *space)
if (stmt == NULL)
return NULL;
if (space == NULL)
- return txn;
+ return stmt;
if (trigger_run(&space->on_stmt_begin, txn) != 0)
goto fail;
@@ -246,9 +242,9 @@ txn_begin_stmt(struct space *space)
if (engine_begin_statement(engine, txn) != 0)
goto fail;
- return txn;
+ return stmt;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
@@ -311,10 +307,23 @@ txn_commit_stmt(struct txn *txn, struct request *request)
--txn->in_sub_stmt;
return 0;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
+/*
+ * Callback called if journal write failed.
+ */
+static void
+journal_write_error_cb(struct trigger *trigger, void *event)
+{
+ (void) event;
+ struct txn *txn = (struct txn *)trigger->data;
+ if (txn->engine)
+ engine_rollback(txn->engine, txn);
+ txn->engine = NULL;
+}
+
static int64_t
txn_write_to_wal(struct txn *txn)
{
@@ -326,6 +335,10 @@ txn_write_to_wal(struct txn *txn)
if (req == NULL)
return -1;
+ struct trigger on_error;
+ trigger_create(&on_error, journal_write_error_cb, txn, NULL);
+ journal_entry_on_error(req, &on_error);
+
struct txn_stmt *stmt;
struct xrow_header **remote_row = req->rows;
struct xrow_header **local_row = req->rows + txn->n_applier_rows;
@@ -348,12 +361,6 @@ txn_write_to_wal(struct txn *txn)
if (res < 0) {
/* Cascading rollback. */
txn_rollback(txn); /* Perform our part of cascading rollback. */
- /*
- * Move fiber to end of event loop to avoid
- * execution of any new requests before all
- * pending rollbacks are processed.
- */
- fiber_reschedule();
diag_set(ClientError, ER_WAL_IO);
diag_log();
} else if (stop - start > too_long_threshold) {
@@ -429,9 +436,8 @@ fail:
}
void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
{
- struct txn *txn = in_txn();
if (txn == NULL || txn->in_sub_stmt == 0)
return;
txn->in_sub_stmt--;
@@ -449,6 +455,7 @@ txn_rollback(struct txn *txn)
unreachable();
panic("rollback trigger failed");
}
+
if (txn->engine)
engine_rollback(txn->engine, txn);
diff --git a/src/box/txn.h b/src/box/txn.h
index ae2d7b9a5..96719638b 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -261,11 +261,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
}
/**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
*/
-struct txn *
-txn_begin_stmt(struct space *space);
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space);
int
txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -324,7 +323,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
* rolls back the entire transaction.
*/
void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
/**
* Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 1f6b30f4a..053558c8c 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
tuple_unref(delete);
- struct txn *txn = txn_begin_stmt(deferred_delete_space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, deferred_delete_space) == NULL)
return -1;
struct tuple *unused;
if (space_execute_dml(deferred_delete_space, txn,
&request, &unused) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
return txn_commit_stmt(txn, &request);
diff --git a/src/box/wal.c b/src/box/wal.c
index ad8ff7c62..8dfa3ef27 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -300,7 +300,10 @@ tx_schedule_rollback(struct cmsg *msg)
* in-memory database state.
*/
stailq_reverse(&writer->rollback);
- /* Must not yield. */
+ /* Call error callback for each request before any scheduling. */
+ struct journal_entry *req;
+ stailq_foreach_entry(req, &writer->rollback, fifo)
+ trigger_run(&req->on_error, NULL);
tx_schedule_queue(&writer->rollback);
stailq_create(&writer->rollback);
}
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
` (5 preceding siblings ...)
2019-04-19 12:44 ` [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
2019-04-24 19:16 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 08/10] Use mempool to alloc wal messages Georgy Kirichenko
` (2 subsequent siblings)
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Use a trigger to handle journal entry write done event. This relaxes
friction between fiber and transaction life cycles.
Prerequisites: #1254
---
src/box/journal.c | 4 ++--
src/box/journal.h | 16 ++++++++++++++--
src/box/wal.c | 36 +++++++++++++++++++++++++-----------
3 files changed, 41 insertions(+), 15 deletions(-)
diff --git a/src/box/journal.c b/src/box/journal.c
index 5cffc7452..b0f4d48b5 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -30,7 +30,6 @@
*/
#include "journal.h"
#include <small/region.h>
-#include <fiber.h>
#include <diag.h>
/**
@@ -66,11 +65,12 @@ journal_entry_new(size_t n_rows, struct region *region)
diag_set(OutOfMemory, size, "region", "struct journal_entry");
return NULL;
}
+ rlist_create(&entry->done_trigger);
entry->approx_len = 0;
entry->n_rows = n_rows;
entry->res = -1;
- entry->fiber = fiber();
rlist_create(&entry->on_error);
+ entry->done = false;
return entry;
}
diff --git a/src/box/journal.h b/src/box/journal.h
index 94be0b007..4a2fb3585 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -34,6 +34,7 @@
#include <stdbool.h>
#include "trigger.h"
#include "salad/stailq.h"
+#include "trigger.h"
#if defined(__cplusplus)
extern "C" {
@@ -56,9 +57,14 @@ struct journal_entry {
*/
int64_t res;
/**
- * The fiber issuing the request.
+ * Turns to true when entry is processed by wal.
+ */
+ bool done;
+ /**
+ * Triggers fired when journal entry processing is done
+ * despite of its success.
*/
- struct fiber *fiber;
+ struct rlist done_trigger;
/**
* A trigger list to call if write failed. Triggers are going to be
* fired before any other processing and are a good place to implement
@@ -109,6 +115,12 @@ struct journal {
void (*destroy)(struct journal *journal);
};
+static inline void
+journal_entry_on_done(struct journal_entry *entry, struct trigger *trigger)
+{
+ trigger_add(&entry->done_trigger, trigger);
+}
+
/**
* Depending on the step of recovery and instance configuration
* points at a concrete implementation of the journal.
diff --git a/src/box/wal.c b/src/box/wal.c
index 8dfa3ef27..6ccc1220a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -258,8 +258,10 @@ tx_schedule_queue(struct stailq *queue)
* are many ready fibers.
*/
struct journal_entry *req;
- stailq_foreach_entry(req, queue, fifo)
- fiber_wakeup(req->fiber);
+ stailq_foreach_entry(req, queue, fifo) {
+ req->done = true;
+ trigger_run(&req->done_trigger, NULL);
+ }
}
/**
@@ -1116,6 +1118,14 @@ wal_writer_f(va_list ap)
return 0;
}
+static void
+on_wal_write_done(struct trigger *trigger, void *event)
+{
+ (void) event;
+ struct fiber_cond *cond = (struct fiber_cond *)trigger->data;
+ fiber_cond_signal(cond);
+}
+
/**
* WAL writer main entry point: queue a single request
* to be written to disk and wait until this task is completed.
@@ -1167,15 +1177,19 @@ wal_write(struct journal *journal, struct journal_entry *entry)
batch->approx_len += entry->approx_len;
writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
cpipe_flush_input(&writer->wal_pipe);
- /**
- * It's not safe to spuriously wakeup this fiber
- * since in that case it will ignore a possible
- * error from WAL writer and not roll back the
- * transaction.
- */
- bool cancellable = fiber_set_cancellable(false);
- fiber_yield(); /* Request was inserted. */
- fiber_set_cancellable(cancellable);
+
+ struct fiber_cond done_cond;
+ fiber_cond_create(&done_cond);
+
+ struct trigger done_trigger;
+ trigger_create(&done_trigger, on_wal_write_done, &done_cond, NULL);
+ journal_entry_on_done(entry, &done_trigger);
+ while (!entry->done)
+ fiber_cond_wait(&done_cond);
+
+ fiber_cond_destroy(&done_cond);
+ trigger_clear(&done_trigger);
+
return entry->res;
}
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH 07/10] Remove fiber from a journal_entry structure
2019-04-19 12:44 ` [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure Georgy Kirichenko
@ 2019-04-24 19:16 ` Konstantin Osipov
0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:16 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 20:00]:
> Use a trigger to handle journal entry write done event. This relaxes
> friction between fiber and transaction life cycles.
I think the idea of journal entry triggers is great, but I also
think it's a bit of an overkill to always initialize triggers and
run all fibers through the cond. Would be ideal if we keep using
simple journal_entry objects for most cases and use journal_entry
with triggers only when we need these triggers. Would that be
possible to do? We could keep the trigger-related members in
journal_entry class, only initialize and use them strictly when
needed, or we could make a derived structure which contains the
trigger state and actions.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH 08/10] Use mempool to alloc wal messages
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
` (6 preceding siblings ...)
2019-04-19 12:44 ` [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
2019-04-24 19:18 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes Georgy Kirichenko
2019-04-19 12:44 ` [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit Georgy Kirichenko
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Don't use fiber gc region to alloc wal messages. This relaxes friction
between fiber life cycle and transaction processing.
Prerequisites: #1254
---
src/box/wal.c | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/src/box/wal.c b/src/box/wal.c
index 6ccc1220a..f0352e938 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -89,6 +89,8 @@ struct wal_writer
struct stailq rollback;
/** A pipe from 'tx' thread to 'wal' */
struct cpipe wal_pipe;
+ /** A memory pool for messages. */
+ struct mempool msg_pool;
/* ----------------- wal ------------------- */
/** A setting from instance configuration - rows_per_wal */
int64_t wal_max_rows;
@@ -287,6 +289,7 @@ tx_schedule_commit(struct cmsg *msg)
/* Update the tx vclock to the latest written by wal. */
vclock_copy(&replicaset.vclock, &batch->vclock);
tx_schedule_queue(&batch->commit);
+ mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
}
static void
@@ -308,6 +311,9 @@ tx_schedule_rollback(struct cmsg *msg)
trigger_run(&req->on_error, NULL);
tx_schedule_queue(&writer->rollback);
stailq_create(&writer->rollback);
+ if (msg != &writer->in_rollback)
+ mempool_free(&writer->msg_pool,
+ container_of(msg, struct wal_msg, base));
}
@@ -378,6 +384,9 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
writer->on_garbage_collection = on_garbage_collection;
writer->on_checkpoint_threshold = on_checkpoint_threshold;
+
+ mempool_create(&writer->msg_pool, &cord()->slabc,
+ sizeof(struct wal_msg));
}
/** Destroy a WAL writer structure. */
@@ -1158,8 +1167,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
stailq_add_tail_entry(&batch->commit, entry, fifo);
} else {
- batch = (struct wal_msg *)
- region_alloc(&fiber()->gc, sizeof(struct wal_msg));
+ batch = (struct wal_msg *)mempool_alloc(&writer->msg_pool);
if (batch == NULL) {
diag_set(OutOfMemory, sizeof(struct wal_msg),
"region", "struct wal_msg");
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
` (7 preceding siblings ...)
2019-04-19 12:44 ` [tarantool-patches] [PATCH 08/10] Use mempool to alloc wal messages Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
2019-04-24 19:19 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit Georgy Kirichenko
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Allow to send a journal entry to wal without wait until the writing
was finished. Two methods were introduced:
* async_write method emits an entry to be written, returns 0 if the
entry was successfully scheduled;
* async_wait method waits until writing was finished and returns a
result of journal write.
Prerequisites: #1254
---
src/box/box.cc | 21 ++++++++++++++++++++-
src/box/journal.c | 18 ++++++++++++++++++
src/box/journal.h | 30 ++++++++++++++++++++++++++++++
src/box/wal.c | 42 +++++++++++++++++++++++++++++++++++-------
4 files changed, 103 insertions(+), 8 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 46cd444fd..88be886f3 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -311,10 +311,29 @@ recovery_journal_write(struct journal *base,
return vclock_sum(journal->vclock);
}
+static int64_t
+recovery_journal_async_write(struct journal *base,
+ struct journal_entry * /* entry */)
+{
+ (void) base;
+ return 0;
+}
+
+static int64_t
+recovery_journal_async_wait(struct journal *base,
+ struct journal_entry * /* entry */)
+{
+ struct recovery_journal *journal = (struct recovery_journal *) base;
+ return vclock_sum(journal->vclock);
+}
+
static inline void
recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
{
- journal_create(&journal->base, recovery_journal_write, NULL);
+ journal_create(&journal->base, recovery_journal_write,
+ recovery_journal_async_write,
+ recovery_journal_async_wait,
+ NULL);
journal->vclock = v;
}
diff --git a/src/box/journal.c b/src/box/journal.c
index b0f4d48b5..7ccbd8594 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -44,8 +44,26 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry)
return 0;
}
+static int64_t
+dummy_async_write(struct journal *journal, struct journal_entry *entry)
+{
+ (void) journal;
+ (void) entry;
+ return 0;
+}
+
+static int64_t
+dummy_async_wait(struct journal *journal, struct journal_entry *entry)
+{
+ (void) journal;
+ (void) entry;
+ return 0;
+}
+
static struct journal dummy_journal = {
dummy_journal_write,
+ dummy_async_write,
+ dummy_async_wait,
NULL,
};
diff --git a/src/box/journal.h b/src/box/journal.h
index 4a2fb3585..0292d77f3 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -112,6 +112,10 @@ journal_entry_on_error(struct journal_entry *entry, struct trigger *trigger)
struct journal {
int64_t (*write)(struct journal *journal,
struct journal_entry *req);
+ int64_t (*async_write)(struct journal *journal,
+ struct journal_entry *req);
+ int64_t (*async_wait)(struct journal *journal,
+ struct journal_entry *req);
void (*destroy)(struct journal *journal);
};
@@ -139,6 +143,28 @@ journal_write(struct journal_entry *entry)
return current_journal->write(current_journal, entry);
}
+/**
+ * Send a single entry to write.
+ *
+ * @return 0 if write was scheduled or -1 on error.
+ */
+static inline int64_t
+journal_async_write(struct journal_entry *entry)
+{
+ return current_journal->async_write(current_journal, entry);
+}
+
+/**
+ * Wait until entry processing finished.
+ * @return a log sequence number (vclock signature) of the entry
+ * or -1 on error.
+ */
+static inline int64_t
+journal_async_wait(struct journal_entry *entry)
+{
+ return current_journal->async_wait(current_journal, entry);
+}
+
/**
* Change the current implementation of the journaling API.
* Happens during life cycle of an instance:
@@ -171,9 +197,13 @@ journal_set(struct journal *new_journal)
static inline void
journal_create(struct journal *journal,
int64_t (*write)(struct journal *, struct journal_entry *),
+ int64_t (*async_write)(struct journal *, struct journal_entry *),
+ int64_t (*async_wait)(struct journal *, struct journal_entry *),
void (*destroy)(struct journal *))
{
journal->write = write;
+ journal->async_write = async_write,
+ journal->async_wait = async_wait,
journal->destroy = destroy;
}
diff --git a/src/box/wal.c b/src/box/wal.c
index f0352e938..39b049b06 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -63,6 +63,12 @@ int wal_dir_lock = -1;
static int64_t
wal_write(struct journal *, struct journal_entry *);
+static int64_t
+wal_async_write(struct journal *, struct journal_entry *);
+
+static int64_t
+wal_async_wait(struct journal *, struct journal_entry *);
+
static int64_t
wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
@@ -362,7 +368,10 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
writer->wal_max_rows = wal_max_rows;
writer->wal_max_size = wal_max_size;
journal_create(&writer->base, wal_mode == WAL_NONE ?
- wal_write_in_wal_mode_none : wal_write, NULL);
+ wal_write_in_wal_mode_none : wal_write,
+ wal_mode == WAL_NONE ?
+ wal_write_in_wal_mode_none: wal_async_write,
+ wal_async_wait, NULL);
struct xlog_opts opts = xlog_opts_default;
opts.sync_is_async = true;
@@ -1135,12 +1144,8 @@ on_wal_write_done(struct trigger *trigger, void *event)
fiber_cond_signal(cond);
}
-/**
- * WAL writer main entry point: queue a single request
- * to be written to disk and wait until this task is completed.
- */
int64_t
-wal_write(struct journal *journal, struct journal_entry *entry)
+wal_async_write(struct journal *journal, struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
@@ -1185,6 +1190,15 @@ wal_write(struct journal *journal, struct journal_entry *entry)
batch->approx_len += entry->approx_len;
writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
cpipe_flush_input(&writer->wal_pipe);
+ return 0;
+}
+
+int64_t
+wal_async_wait(struct journal *journal, struct journal_entry *entry)
+{
+ (void) journal;
+ if (entry->done)
+ return entry->res;
struct fiber_cond done_cond;
fiber_cond_create(&done_cond);
@@ -1201,6 +1215,18 @@ wal_write(struct journal *journal, struct journal_entry *entry)
return entry->res;
}
+/**
+ * WAL writer main entry point: queue a single request
+ * to be written to disk and wait until this task is completed.
+ */
+int64_t
+wal_write(struct journal *journal, struct journal_entry *entry)
+{
+ if (wal_async_write(journal, entry) != 0)
+ return -1;
+ return wal_async_wait(journal, entry);
+}
+
int64_t
wal_write_in_wal_mode_none(struct journal *journal,
struct journal_entry *entry)
@@ -1212,7 +1238,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
entry->rows + entry->n_rows);
vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
- return vclock_sum(&writer->vclock);
+ entry->done = true;
+ entry->res = vclock_sum(&writer->vclock);
+ return entry->res;
}
void
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH 09/10] Enable asyncronous wal writes
2019-04-19 12:44 ` [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes Georgy Kirichenko
@ 2019-04-24 19:19 ` Konstantin Osipov
0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 20:00]:
> Allow to send a journal entry to wal without wait until the writing
> was finished. Two methods were introduced:
> * async_write method emits an entry to be written, returns 0 if the
> entry was successfully scheduled;
> * async_wait method waits until writing was finished and returns a
> result of journal write.
>
The approach is looking good, I may have a few nits about the api
(will send them in a second review, separately).
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
` (8 preceding siblings ...)
2019-04-19 12:44 ` [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes Georgy Kirichenko
@ 2019-04-19 12:44 ` Georgy Kirichenko
2019-04-24 19:20 ` [tarantool-patches] " Konstantin Osipov
9 siblings, 1 reply; 21+ messages in thread
From: Georgy Kirichenko @ 2019-04-19 12:44 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Allow asynchronous transaction commit. This adds two functions:
* txn_async_commit that sends a transaction to a journal
* txn_async_wait that waits until the transaction processing was done
Prerequisites: #1254
---
src/box/txn.c | 121 +++++++++++++++++++++++++++++++++++++-------------
src/box/txn.h | 10 +++++
2 files changed, 99 insertions(+), 32 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 8f5b66480..eb57b861a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -194,6 +194,7 @@ txn_begin()
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
+ txn->entry = NULL;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
return txn;
@@ -324,20 +325,24 @@ journal_write_error_cb(struct trigger *trigger, void *event)
txn->engine = NULL;
}
+/*
+ * Send the txn to a journal.
+ */
static int64_t
-txn_write_to_wal(struct txn *txn)
+txn_journal_async_write(struct txn *txn)
{
+ assert(txn->entry == NULL);
assert(txn->n_new_rows + txn->n_applier_rows > 0);
+ /* Prepare a journal entry. */
struct journal_entry *req = journal_entry_new(txn->n_new_rows +
txn->n_applier_rows,
&txn->region);
if (req == NULL)
return -1;
- struct trigger on_error;
- trigger_create(&on_error, journal_write_error_cb, txn, NULL);
- journal_entry_on_error(req, &on_error);
+ trigger_create(&txn->on_error, journal_write_error_cb, txn, NULL);
+ journal_entry_on_error(req, &txn->on_error);
struct txn_stmt *stmt;
struct xrow_header **remote_row = req->rows;
@@ -354,31 +359,34 @@ txn_write_to_wal(struct txn *txn)
assert(remote_row == req->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
- ev_tstamp start = ev_monotonic_now(loop());
- int64_t res = journal_write(req);
- ev_tstamp stop = ev_monotonic_now(loop());
-
- if (res < 0) {
- /* Cascading rollback. */
- txn_rollback(txn); /* Perform our part of cascading rollback. */
+ txn->entry = req;
+ /* Send entry to a journal. */
+ if (journal_async_write(txn->entry) < 0) {
diag_set(ClientError, ER_WAL_IO);
- diag_log();
- } else if (stop - start > too_long_threshold) {
- int n_rows = txn->n_new_rows + txn->n_applier_rows;
- say_warn_ratelimited("too long WAL write: %d rows at "
- "LSN %lld: %.3f sec", n_rows,
- res - n_rows + 1, stop - start);
+ return -1;
}
- /*
- * Use vclock_sum() from WAL writer as transaction signature.
- */
- return res;
+ return 0;
}
-int
-txn_commit(struct txn *txn)
+/*
+ * Wait until journal processing finished.
+ */
+static int64_t
+txn_journal_async_wait(struct txn *txn)
+{
+ assert(txn->entry != NULL);
+ txn->signature = journal_async_wait(txn->entry);
+ if (txn->signature < 0)
+ diag_set(ClientError, ER_WAL_IO);
+ return txn->signature;
+}
+
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
{
- assert(txn == in_txn());
/*
* If transaction has been started in SQL, deferred
* foreign key constraints must not be violated.
@@ -388,7 +396,7 @@ txn_commit(struct txn *txn)
struct sql_txn *sql_txn = txn->psql_txn;
if (sql_txn->fk_deferred_count != 0) {
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
- goto fail;
+ return -1;
}
}
/*
@@ -396,15 +404,54 @@ txn_commit(struct txn *txn)
* we have a bunch of IPROTO_NOP statements.
*/
if (txn->engine != NULL) {
- if (engine_prepare(txn->engine, txn) != 0)
- goto fail;
+ if (engine_prepare(txn->engine, txn) != 0) {
+ return -1;
+ }
}
+ return 0;
+}
- if (txn->n_new_rows + txn->n_applier_rows > 0) {
- txn->signature = txn_write_to_wal(txn);
- if (txn->signature < 0)
- return -1;
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_async_commit(struct txn *txn)
+{
+ assert(txn == in_txn());
+ if (txn_prepare(txn) != 0)
+ goto fail;
+
+ txn->start_tm = ev_monotonic_now(loop());
+ if (txn->n_new_rows + txn->n_applier_rows == 0)
+ return 0;
+
+ if (txn_journal_async_write(txn) != 0)
+ goto fail;
+
+ return 0;
+fail:
+ txn_rollback(txn);
+ return -1;
+}
+
+/*
+ * Wait until transaction processing was finished.
+ */
+int
+txn_async_wait(struct txn *txn)
+{
+ if (txn->n_new_rows + txn->n_applier_rows > 0 &&
+ txn_journal_async_wait(txn) < 0)
+ goto fail;
+ ev_tstamp stop_tm = ev_monotonic_now(loop());
+ if (stop_tm - txn->start_tm > too_long_threshold) {
+ int n_rows = txn->n_new_rows + txn->n_applier_rows;
+ say_warn_ratelimited("too long WAL write: %d rows at "
+ "LSN %lld: %.3f sec", n_rows,
+ txn->signature - n_rows + 1,
+ stop_tm - txn->start_tm);
}
+
/*
* The transaction is in the binary log. No action below
* may throw. In case an error has happened, there is
@@ -417,7 +464,7 @@ txn_commit(struct txn *txn)
panic("commit trigger failed");
}
/*
- * Engine can be NULL if transaction contains IPROTO_NOP
+ * Engine can be NULL if the transaction contains IPROTO_NOP
* statements only.
*/
if (txn->engine != NULL)
@@ -430,11 +477,21 @@ txn_commit(struct txn *txn)
fiber_set_txn(fiber(), NULL);
txn_free(txn);
return 0;
+
fail:
txn_rollback(txn);
return -1;
}
+int
+txn_commit(struct txn *txn)
+{
+ if (txn_async_commit(txn) != 0 ||
+ txn_async_wait(txn) < 0)
+ return -1;
+ return 0;
+}
+
void
txn_rollback_stmt(struct txn *txn)
{
diff --git a/src/box/txn.h b/src/box/txn.h
index 96719638b..09f086fa4 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -187,6 +187,16 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
+ /**
+ * Trigger to call if write failed.
+ */
+ struct trigger on_error;
+ /**
+ * Journal entry to control txn write.
+ */
+ struct journal_entry *entry;
+ /** Timestampt of entry write start. */
+ ev_tstamp start_tm;
};
/* Pointer to the current transaction (if any) */
--
2.21.0
^ permalink raw reply [flat|nested] 21+ messages in thread
* [tarantool-patches] Re: [PATCH 10/10] Introduce asynchronous txn commit
2019-04-19 12:44 ` [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit Georgy Kirichenko
@ 2019-04-24 19:20 ` Konstantin Osipov
0 siblings, 0 replies; 21+ messages in thread
From: Konstantin Osipov @ 2019-04-24 19:20 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Georgy Kirichenko <georgy@tarantool.org> [19/04/19 20:00]:
> Allow asynchronous transaction commit. This adds two functions:
> * txn_async_commit that sends a transaction to a journal
> * txn_async_wait that waits until the transaction processing was done
>
I see where you're heading. could you please submit a parallel
applier patch using this stack? As you can see most of the patches
are looking good, thank you for great work!
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 21+ messages in thread