* [tarantool-patches] [PATCH 1/2] Introduce a txn memory region
2019-04-10 7:22 [tarantool-patches] [PATCH 0/2] Transaction refactoring Georgy Kirichenko
@ 2019-04-10 7:22 ` Georgy Kirichenko
2019-04-10 10:11 ` [tarantool-patches] " Konstantin Osipov
2019-04-10 7:22 ` [tarantool-patches] [PATCH 2/2] Get rid of aurocommit from a txn structure Georgy Kirichenko
1 sibling, 1 reply; 5+ messages in thread
From: Georgy Kirichenko @ 2019-04-10 7:22 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Attach a separate memory region for each txn structure in order to store
all txn internal data until the transaction finished. This patch is a
preparation to detach a txn from a fiber and a fiber gc storage.
Prerequisites: #1254
---
src/box/errcode.h | 1 +
src/box/txn.c | 66 ++++++++++++++++++++++++++++++++++++--------
src/box/txn.h | 2 ++
test/box/misc.result | 1 +
4 files changed, 58 insertions(+), 12 deletions(-)
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 3f8cb8e0e..bd1a684ab 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -246,6 +246,7 @@ struct errcode_record {
/*191 */_(ER_SQL_PARSER_LIMIT, "%s %d exceeds the limit (%d)") \
/*192 */_(ER_INDEX_DEF_UNSUPPORTED, "%s are prohibited in an index definition") \
/*193 */_(ER_CK_DEF_UNSUPPORTED, "%s are prohibited in a CHECK constraint definition") \
+ /*194 */_(ER_NO_TRANSACTION, "No active transaction") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/txn.c b/src/box/txn.c
index d51bfc67a..be5e209b6 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -37,6 +37,9 @@
double too_long_threshold;
+/* Txn cache. */
+static struct rlist txn_cache = RLIST_HEAD_INITIALIZER(txn_cache);
+
static inline void
fiber_set_txn(struct fiber *fiber, struct txn *txn)
{
@@ -44,7 +47,7 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
}
static int
-txn_add_redo(struct txn_stmt *stmt, struct request *request)
+txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
{
stmt->row = request->header;
if (request->header != NULL)
@@ -52,7 +55,7 @@ txn_add_redo(struct txn_stmt *stmt, struct request *request)
/* Create a redo log row for Lua requests */
struct xrow_header *row;
- row = region_alloc_object(&fiber()->gc, struct xrow_header);
+ row = region_alloc_object(&txn->region, struct xrow_header);
if (row == NULL) {
diag_set(OutOfMemory, sizeof(*row),
"region", "struct xrow_header");
@@ -77,7 +80,7 @@ static struct txn_stmt *
txn_stmt_new(struct txn *txn)
{
struct txn_stmt *stmt;
- stmt = region_alloc_object(&fiber()->gc, struct txn_stmt);
+ stmt = region_alloc_object(&txn->region, struct txn_stmt);
if (stmt == NULL) {
diag_set(OutOfMemory, sizeof(*stmt),
"region", "struct txn_stmt");
@@ -134,16 +137,49 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
}
}
+/*
+ * Return a txn from cache or create a new one if cache is empty.
+ */
+static struct txn *
+txn_new()
+{
+ if (!rlist_empty(&txn_cache))
+ return rlist_shift_entry(&txn_cache, struct txn, list);
+
+ /* Create a region. */
+ struct region region;
+ region_create(®ion, &cord()->slabc);
+
+ /* Place txn structure on the region. */
+ struct txn *txn = region_alloc_object(®ion, struct txn);
+ if (txn == NULL) {
+ diag_set(OutOfMemory, sizeof(*txn), "region", "struct txn");
+ return NULL;
+ }
+ txn->region = region;
+ return txn;
+}
+
+/*
+ * Free txn memory and return it to a cache.
+ */
+static void
+txn_free(struct txn *txn)
+{
+ /* Truncate region up to struct txn size. */
+ region_truncate(&txn->region, sizeof(struct txn));
+ rlist_add(&txn_cache, &txn->list);
+}
+
struct txn *
txn_begin(bool is_autocommit)
{
static int64_t tsn = 0;
assert(! in_txn());
- struct txn *txn = region_alloc_object(&fiber()->gc, struct txn);
- if (txn == NULL) {
- diag_set(OutOfMemory, sizeof(*txn), "region", "struct txn");
+ struct txn *txn = txn_new();
+ if (txn == NULL)
return NULL;
- }
+
/* Initialize members explicitly to save time on memset() */
stailq_create(&txn->stmts);
txn->n_new_rows = 0;
@@ -251,7 +287,7 @@ txn_commit_stmt(struct txn *txn, struct request *request)
* stmt->space can be NULL for IRPOTO_NOP.
*/
if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
- if (txn_add_redo(stmt, request) != 0)
+ if (txn_add_redo(txn, stmt, request) != 0)
goto fail;
assert(stmt->row != NULL);
if (stmt->row->replica_id == 0) {
@@ -393,8 +429,8 @@ txn_commit(struct txn *txn)
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_unref_tuples(stmt);
- TRASH(txn);
fiber_set_txn(fiber(), NULL);
+ txn_free(txn);
return 0;
fail:
txn_rollback();
@@ -433,10 +469,10 @@ txn_rollback()
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_unref_tuples(stmt);
- TRASH(txn);
/** Free volatile txn memory. */
fiber_gc();
fiber_set_txn(fiber(), NULL);
+ txn_free(txn);
}
void
@@ -521,12 +557,18 @@ box_txn_rollback()
void *
box_txn_alloc(size_t size)
{
+ struct txn *txn = in_txn();
+ if (txn == NULL) {
+ /* There are no transaction yet - return an error. */
+ diag_set(ClientError, ER_NO_TRANSACTION);
+ return NULL;
+ }
union natural_align {
void *p;
double lf;
long l;
};
- return region_aligned_alloc(&fiber()->gc, size,
+ return region_aligned_alloc(&txn->region, size,
alignof(union natural_align));
}
@@ -539,7 +581,7 @@ box_txn_savepoint()
return NULL;
}
struct txn_savepoint *svp =
- (struct txn_savepoint *) region_alloc_object(&fiber()->gc,
+ (struct txn_savepoint *) region_alloc_object(&txn->region,
struct txn_savepoint);
if (svp == NULL) {
diag_set(OutOfMemory, sizeof(*svp),
diff --git a/src/box/txn.h b/src/box/txn.h
index 747b38db3..84c82af92 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -132,6 +132,8 @@ struct autoinc_id_entry {
};
struct txn {
+ struct rlist list;
+ struct region region;
/**
* A sequentially growing transaction id, assigned when
* a transaction is initiated. Used to identify
diff --git a/test/box/misc.result b/test/box/misc.result
index e2b618c9c..be3cbde37 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -522,6 +522,7 @@ t;
191: box.error.SQL_PARSER_LIMIT
192: box.error.INDEX_DEF_UNSUPPORTED
193: box.error.CK_DEF_UNSUPPORTED
+ 194: box.error.NO_TRANSACTION
...
test_run:cmd("setopt delimiter ''");
---
--
2.21.0
^ permalink raw reply [flat|nested] 5+ messages in thread
* [tarantool-patches] [PATCH 2/2] Get rid of aurocommit from a txn structure
2019-04-10 7:22 [tarantool-patches] [PATCH 0/2] Transaction refactoring Georgy Kirichenko
2019-04-10 7:22 ` [tarantool-patches] [PATCH 1/2] Introduce a txn memory region Georgy Kirichenko
@ 2019-04-10 7:22 ` Georgy Kirichenko
2019-04-11 6:43 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 1 reply; 5+ messages in thread
From: Georgy Kirichenko @ 2019-04-10 7:22 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Move transaction auto start and auto commit behavior to box level.
Now transaction won't start and commit automatically without
txn_begin/txn_commit invocations. This is a part of a bigger transaction
refactoring in order to implement detachable transactions and a parallel
applier.
Prerequisites: #1254
---
src/box/applier.cc | 27 +++++++++++++---
src/box/box.cc | 73 +++++++++++++++++++++++++++++++-----------
src/box/memtx_engine.c | 10 ++++--
src/box/txn.c | 25 ++++-----------
src/box/txn.h | 7 +---
src/box/vinyl.c | 12 +++----
6 files changed, 97 insertions(+), 57 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 373e1feb9..3b74e0f54 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -172,11 +172,22 @@ applier_writer_f(va_list ap)
static int
apply_initial_join_row(struct xrow_header *row)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
struct request request;
xrow_decode_dml(row, &request, dml_request_key_map(row->type));
- struct space *space = space_cache_find_xc(request.space_id);
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- return space_apply_initial_join_row(space, &request);
+ if (space_apply_initial_join_row(space, &request)) {
+ txn_rollback();
+ return -1;
+ }
+ int rc = txn_commit(txn);
+ fiber_gc();
+ return rc;
}
/**
@@ -403,8 +414,16 @@ applier_join(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
- if (apply_row(&row) != 0)
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
+ if (apply_row(&row) != 0) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
+ fiber_gc();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
@@ -555,7 +574,7 @@ applier_apply_tx(struct stailq *rows)
* conflict safely access failed xrow object and allocate
* IPROTO_NOP on gc.
*/
- struct txn *txn = txn_begin(false);
+ struct txn *txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL)
diag_raise();
diff --git a/src/box/box.cc b/src/box/box.cc
index 7828f575b..6119ca078 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -169,34 +169,53 @@ int
box_process_rw(struct request *request, struct space *space,
struct tuple **result)
{
+ int rc = -1;
+ struct tuple *tuple = NULL;
+ struct txn *txn = in_txn();
+ bool autocommit = txn == NULL;
+ if (txn == NULL && (txn = txn_begin()) == NULL)
+ return -1;
assert(iproto_type_is_dml(request->type));
rmean_collect(rmean_box, request->type, 1);
if (access_check_space(space, PRIV_W) != 0)
- return -1;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
- return -1;
- struct tuple *tuple;
+ goto fail;
+ if (txn_begin_stmt(space) == NULL)
+ goto fail;
if (space_execute_dml(space, txn, request, &tuple) != 0) {
txn_rollback_stmt();
- return -1;
+ goto fail;
}
- if (result == NULL)
- return txn_commit_stmt(txn, request);
- *result = tuple;
- if (tuple == NULL)
- return txn_commit_stmt(txn, request);
/*
* Pin the tuple locally before the commit,
* otherwise it may go away during yield in
* when WAL is written in autocommit mode.
*/
- tuple_ref(tuple);
- int rc = txn_commit_stmt(txn, request);
- if (rc == 0)
+ if (tuple != NULL)
+ tuple_ref(tuple);
+
+ if (result != NULL)
+ *result = tuple;
+
+ if (txn_commit_stmt(txn, request) != 0 ||
+ (autocommit && txn_commit(txn) != 0))
+ goto fail;
+ if (autocommit)
+ fiber_gc();
+
+ if (tuple != NULL && result != NULL) {
tuple_bless(tuple);
- tuple_unref(tuple);
+ }
+
+ rc = 0;
+done:
+ if (tuple != NULL)
+ tuple_unref(tuple);
return rc;
+
+fail:
+ if (autocommit)
+ txn_rollback();
+ goto done;
}
void
@@ -299,8 +318,12 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
- if (box_process_rw(&request, space, NULL) != 0) {
+ struct txn *txn = txn_begin();
+ if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
+ txn_commit(txn) != 0) {
say_error("error applying row: %s", request_str(&request));
+ if (txn != NULL)
+ txn_rollback();
diag_raise();
}
}
@@ -1313,8 +1336,15 @@ box_sequence_reset(uint32_t seq_id)
static inline void
box_register_replica(uint32_t id, const struct tt_uuid *uuid)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
- (unsigned) id, tt_uuid_str(uuid)) != 0)
+ (unsigned) id, tt_uuid_str(uuid)) != 0) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
assert(replica_by_uuid(uuid)->id == id);
}
@@ -1636,9 +1666,16 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
uu = *replicaset_uuid;
else
tt_uuid_create(&uu);
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
/* Save replica set UUID in _schema */
if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
- tt_uuid_str(&uu)))
+ tt_uuid_str(&uu))) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
}
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 924f8bbc4..f052657f7 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -264,16 +264,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
return -1;
}
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- if (space_apply_initial_join_row(space, &request) != 0)
+ if (space_apply_initial_join_row(space, &request) != 0) {
+ txn_rollback();
return -1;
+ }
+ int rc = txn_commit(txn);
/*
* Don't let gc pool grow too much. Yet to
* it before reading the next row, to make
* sure it's not freed along here.
*/
fiber_gc();
- return 0;
+ return rc;
}
/** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/txn.c b/src/box/txn.c
index be5e209b6..2b44aba48 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -172,7 +172,7 @@ txn_free(struct txn *txn)
}
struct txn *
-txn_begin(bool is_autocommit)
+txn_begin()
{
static int64_t tsn = 0;
assert(! in_txn());
@@ -185,7 +185,6 @@ txn_begin(bool is_autocommit)
txn->n_new_rows = 0;
txn->n_local_rows = 0;
txn->n_applier_rows = 0;
- txn->is_autocommit = is_autocommit;
txn->has_triggers = false;
txn->is_aborted = false;
txn->in_sub_stmt = 0;
@@ -223,19 +222,15 @@ txn_begin_stmt(struct space *space)
{
struct txn *txn = in_txn();
if (txn == NULL) {
- txn = txn_begin(true);
- if (txn == NULL)
- return NULL;
+ diag_set(ClientError, ER_NO_TRANSACTION);
+ return NULL;
} else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
diag_set(ClientError, ER_SUB_STMT_MAX);
return NULL;
}
struct txn_stmt *stmt = txn_stmt_new(txn);
- if (stmt == NULL) {
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- txn_rollback();
+ if (stmt == NULL)
return NULL;
- }
if (space == NULL)
return txn;
@@ -270,8 +265,7 @@ txn_is_distributed(struct txn *txn)
}
/**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * End a statement.
*/
int
txn_commit_stmt(struct txn *txn, struct request *request)
@@ -314,11 +308,6 @@ txn_commit_stmt(struct txn *txn, struct request *request)
goto fail;
}
--txn->in_sub_stmt;
- if (txn->is_autocommit && txn->in_sub_stmt == 0) {
- int rc = txn_commit(txn);
- fiber_gc();
- return rc;
- }
return 0;
fail:
txn_rollback_stmt();
@@ -444,8 +433,6 @@ txn_rollback_stmt()
if (txn == NULL || txn->in_sub_stmt == 0)
return;
txn->in_sub_stmt--;
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- return txn_rollback();
txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]);
}
@@ -516,7 +503,7 @@ box_txn_begin()
diag_set(ClientError, ER_ACTIVE_TRANSACTION);
return -1;
}
- if (txn_begin(false) == NULL)
+ if (txn_begin() == NULL)
return -1;
return 0;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index 84c82af92..a304e11d9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -154,11 +154,6 @@ struct txn {
* already assigned LSN.
*/
int n_applier_rows;
- /**
- * True if this transaction is running in autocommit mode
- * (statement end causes an automatic transaction commit).
- */
- bool is_autocommit;
/**
* True if the transaction was aborted so should be
* rolled back at commit.
@@ -206,7 +201,7 @@ in_txn()
* @pre no transaction is active
*/
struct txn *
-txn_begin(bool is_autocommit);
+txn_begin();
/**
* Commit a transaction.
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 1e9937613..8a90cd1c9 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2399,10 +2399,8 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn)
txn->engine_tx = vy_tx_begin(env->xm);
if (txn->engine_tx == NULL)
return -1;
- if (!txn->is_autocommit) {
- trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
- trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
- }
+ trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
+ trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
return 0;
}
@@ -2472,8 +2470,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
vy_regulator_check_dump_watermark(&env->regulator);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
+ trigger_clear(&txn->fiber_on_stop);
}
static void
@@ -2487,8 +2484,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
vy_tx_rollback(tx);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
+ trigger_clear(&txn->fiber_on_stop);
}
static int
--
2.21.0
^ permalink raw reply [flat|nested] 5+ messages in thread