[tarantool-patches] [PATCH v2 2/8] Get rid of autocommit from a txn structure
Georgy Kirichenko
georgy at tarantool.org
Thu May 23 11:19:34 MSK 2019
Move transaction auto start and auto commit behavior to the box level.
>From now a transaction won't start and commit automatically without
txn_begin/txn_commit invocations. This is a part of a bigger transaction
refactoring in order to implement detachable transactions and a parallel
applier.
Prerequisites: #1254
---
src/box/applier.cc | 31 +++++++++++---
src/box/box.cc | 91 ++++++++++++++++++++++++++++++++----------
src/box/index.cc | 10 ++---
src/box/journal.h | 1 +
src/box/memtx_engine.c | 10 ++++-
src/box/memtx_space.c | 8 ++--
src/box/sql.c | 2 +-
src/box/txn.c | 47 ++++++++--------------
src/box/txn.h | 16 +++-----
src/box/vinyl.c | 12 ++----
src/box/vy_scheduler.c | 6 +--
src/box/wal.c | 1 -
12 files changed, 142 insertions(+), 93 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 373e1feb9..9d2989094 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -172,11 +172,22 @@ applier_writer_f(va_list ap)
static int
apply_initial_join_row(struct xrow_header *row)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
struct request request;
xrow_decode_dml(row, &request, dml_request_key_map(row->type));
- struct space *space = space_cache_find_xc(request.space_id);
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- return space_apply_initial_join_row(space, &request);
+ if (space_apply_initial_join_row(space, &request)) {
+ txn_rollback();
+ return -1;
+ }
+ int rc = txn_commit(txn);
+ fiber_gc();
+ return rc;
}
/**
@@ -189,8 +200,8 @@ static int
process_nop(struct request *request)
{
assert(request->type == IPROTO_NOP);
- struct txn *txn = txn_begin_stmt(NULL);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, NULL) == NULL)
return -1;
return txn_commit_stmt(txn, request);
}
@@ -403,8 +414,16 @@ applier_join(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
- if (apply_row(&row) != 0)
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
+ if (apply_row(&row) != 0) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
+ fiber_gc();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
@@ -555,7 +574,7 @@ applier_apply_tx(struct stailq *rows)
* conflict safely access failed xrow object and allocate
* IPROTO_NOP on gc.
*/
- struct txn *txn = txn_begin(false);
+ struct txn *txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL)
diag_raise();
diff --git a/src/box/box.cc b/src/box/box.cc
index 57419ee01..2a738fa36 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -169,34 +169,65 @@ int
box_process_rw(struct request *request, struct space *space,
struct tuple **result)
{
+ struct tuple *tuple = NULL;
+ struct txn *txn = in_txn();
+ bool is_autocommit = txn == NULL;
+ if (is_autocommit && (txn = txn_begin()) == NULL)
+ return -1;
assert(iproto_type_is_dml(request->type));
rmean_collect(rmean_box, request->type, 1);
if (access_check_space(space, PRIV_W) != 0)
- return -1;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
- return -1;
- struct tuple *tuple;
+ goto fail;
+ if (txn_begin_stmt(txn, space) == NULL)
+ goto fail;
if (space_execute_dml(space, txn, request, &tuple) != 0) {
- txn_rollback_stmt();
- return -1;
+ txn_rollback_stmt(txn);
+ goto fail;
}
- if (result == NULL)
- return txn_commit_stmt(txn, request);
- *result = tuple;
- if (tuple == NULL)
- return txn_commit_stmt(txn, request);
/*
* Pin the tuple locally before the commit,
* otherwise it may go away during yield in
* when WAL is written in autocommit mode.
*/
+ if (result != NULL)
+ *result = tuple;
+
+ if (result == NULL || tuple == NULL) {
+ if (!is_autocommit)
+ return txn_commit_stmt(txn, request);
+ /* Autocommit mode. */
+ if (txn_commit_stmt(txn, request) != 0) {
+ txn_rollback();
+ return -1;
+ }
+ if (txn_commit(txn) != 0)
+ return -1;
+ fiber_gc();
+ return 0;
+ }
tuple_ref(tuple);
- int rc = txn_commit_stmt(txn, request);
- if (rc == 0)
- tuple_bless(tuple);
+
+ if (txn_commit_stmt(txn, request)) {
+ /* Unref tuple and rollback if autocommit. */
+ tuple_unref(tuple);
+ goto fail;
+ }
+ if (is_autocommit) {
+ if (txn_commit(txn) != 0) {
+ /* Unref tuple and exit. */
+ tuple_unref(tuple);
+ return -1;
+ }
+ fiber_gc();
+ }
+ tuple_bless(tuple);
tuple_unref(tuple);
- return rc;
+ return 0;
+
+fail:
+ if (is_autocommit)
+ txn_rollback();
+ return -1;
}
void
@@ -299,8 +330,12 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
- if (box_process_rw(&request, space, NULL) != 0) {
+ struct txn *txn = txn_begin();
+ if (txn == NULL || box_process_rw(&request, space, NULL) != 0 ||
+ txn_commit(txn) != 0) {
say_error("error applying row: %s", request_str(&request));
+ if (txn != NULL)
+ txn_rollback();
diag_raise();
}
}
@@ -1055,7 +1090,7 @@ box_select(uint32_t space_id, uint32_t index_id,
struct iterator *it = index_create_iterator(index, type,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -1080,7 +1115,7 @@ box_select(uint32_t space_id, uint32_t index_id,
if (rc != 0) {
port_destroy(port);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -1313,8 +1348,15 @@ box_sequence_reset(uint32_t seq_id)
static inline void
box_register_replica(uint32_t id, const struct tt_uuid *uuid)
{
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
- (unsigned) id, tt_uuid_str(uuid)) != 0)
+ (unsigned) id, tt_uuid_str(uuid)) != 0) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
assert(replica_by_uuid(uuid)->id == id);
}
@@ -1636,9 +1678,16 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
uu = *replicaset_uuid;
else
tt_uuid_create(&uu);
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ diag_raise();
/* Save replica set UUID in _schema */
if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster",
- tt_uuid_str(&uu)))
+ tt_uuid_str(&uu))) {
+ txn_rollback();
+ diag_raise();
+ }
+ if (txn_commit(txn) != 0)
diag_raise();
}
diff --git a/src/box/index.cc b/src/box/index.cc
index 4a444e5d0..7f26c9bc2 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -240,7 +240,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_get(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -274,7 +274,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_min(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -306,7 +306,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_max(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -340,7 +340,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
return -1;
ssize_t count = index_count(index, itype, key, part_count);
if (count < 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -377,7 +377,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
struct iterator *it = index_create_iterator(index, itype,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
txn_commit_ro_stmt(txn);
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..77d3073c8 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -32,6 +32,7 @@
*/
#include <stdint.h>
#include <stdbool.h>
+#include "trigger.h"
#include "salad/stailq.h"
#if defined(__cplusplus)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index f4312484a..149215b87 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -272,16 +272,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
return -1;
}
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
/* no access checks here - applier always works with admin privs */
- if (space_apply_initial_join_row(space, &request) != 0)
+ if (space_apply_initial_join_row(space, &request) != 0) {
+ txn_rollback();
return -1;
+ }
+ int rc = txn_commit(txn);
/*
* Don't let gc pool grow too much. Yet to
* it before reading the next row, to make
* sure it's not freed along here.
*/
fiber_gc();
- return 0;
+ return rc;
}
/** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 5ddb4f7ee..829ab7f4c 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
return -1;
}
request->header->replica_id = 0;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ struct txn_stmt *stmt = txn_begin_stmt(txn, space);
+ if (stmt == NULL)
return -1;
- struct txn_stmt *stmt = txn_current_stmt(txn);
stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
request->tuple_end);
if (stmt->new_tuple == NULL)
@@ -326,7 +326,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
rollback:
say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
diff --git a/src/box/sql.c b/src/box/sql.c
index fbfa59992..1bdbdb815 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -915,7 +915,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
part_count);
if (it == NULL) {
if (txn != NULL)
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
pCur->eState = CURSOR_INVALID;
return SQL_TARANTOOL_ITERATOR_FAIL;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index da749d7cc..87fd8b3bc 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -174,7 +174,7 @@ txn_free(struct txn *txn)
}
struct txn *
-txn_begin(bool is_autocommit)
+txn_begin()
{
static int64_t tsn = 0;
assert(! in_txn());
@@ -187,7 +187,6 @@ txn_begin(bool is_autocommit)
txn->n_new_rows = 0;
txn->n_local_rows = 0;
txn->n_applier_rows = 0;
- txn->is_autocommit = is_autocommit;
txn->has_triggers = false;
txn->is_aborted = false;
txn->in_sub_stmt = 0;
@@ -220,26 +219,20 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
return 0;
}
-struct txn *
-txn_begin_stmt(struct space *space)
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space)
{
- struct txn *txn = in_txn();
- if (txn == NULL) {
- txn = txn_begin(true);
- if (txn == NULL)
- return NULL;
- } else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+ assert(txn == in_txn());
+ assert(txn != NULL);
+ if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
diag_set(ClientError, ER_SUB_STMT_MAX);
return NULL;
}
struct txn_stmt *stmt = txn_stmt_new(txn);
- if (stmt == NULL) {
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- txn_rollback();
+ if (stmt == NULL)
return NULL;
- }
if (space == NULL)
- return txn;
+ return stmt;
if (trigger_run(&space->on_stmt_begin, txn) != 0)
goto fail;
@@ -252,9 +245,9 @@ txn_begin_stmt(struct space *space)
if (engine_begin_statement(engine, txn) != 0)
goto fail;
- return txn;
+ return stmt;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
@@ -272,8 +265,7 @@ txn_is_distributed(struct txn *txn)
}
/**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * End a statement.
*/
int
txn_commit_stmt(struct txn *txn, struct request *request)
@@ -316,14 +308,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
goto fail;
}
--txn->in_sub_stmt;
- if (txn->is_autocommit && txn->in_sub_stmt == 0) {
- int rc = txn_commit(txn);
- fiber_gc();
- return rc;
- }
return 0;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -359,7 +346,7 @@ txn_write_to_wal(struct txn *txn)
if (res < 0) {
/* Cascading rollback. */
- txn_rollback(); /* Perform our part of cascading rollback. */
+ txn_rollback(txn); /* Perform our part of cascading rollback. */
/*
* Move fiber to end of event loop to avoid
* execution of any new requests before all
@@ -441,14 +428,11 @@ fail:
}
void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
{
- struct txn *txn = in_txn();
if (txn == NULL || txn->in_sub_stmt == 0)
return;
txn->in_sub_stmt--;
- if (txn->is_autocommit && txn->in_sub_stmt == 0)
- return txn_rollback();
txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]);
}
@@ -465,6 +449,7 @@ txn_rollback()
unreachable();
panic("rollback trigger failed");
}
+
if (txn->engine)
engine_rollback(txn->engine, txn);
@@ -519,7 +504,7 @@ box_txn_begin()
diag_set(ClientError, ER_ACTIVE_TRANSACTION);
return -1;
}
- if (txn_begin(false) == NULL)
+ if (txn_begin() == NULL)
return -1;
return 0;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index f4d861824..5a66f8e53 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -162,11 +162,6 @@ struct txn {
* already assigned LSN.
*/
int n_applier_rows;
- /**
- * True if this transaction is running in autocommit mode
- * (statement end causes an automatic transaction commit).
- */
- bool is_autocommit;
/**
* True if the transaction was aborted so should be
* rolled back at commit.
@@ -214,7 +209,7 @@ in_txn()
* @pre no transaction is active
*/
struct txn *
-txn_begin(bool is_autocommit);
+txn_begin();
/**
* Commit a transaction.
@@ -271,11 +266,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
}
/**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
*/
-struct txn *
-txn_begin_stmt(struct space *space);
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space);
int
txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -334,7 +328,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
* rolls back the entire transaction.
*/
void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
/**
* Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 9372e5f7f..2abfab366 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2415,10 +2415,8 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn)
txn->engine_tx = vy_tx_begin(env->xm);
if (txn->engine_tx == NULL)
return -1;
- if (!txn->is_autocommit) {
- trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
- trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
- }
+ trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
+ trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
return 0;
}
@@ -2488,8 +2486,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
vy_regulator_check_dump_watermark(&env->regulator);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
+ trigger_clear(&txn->fiber_on_stop);
}
static void
@@ -2503,8 +2500,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
vy_tx_rollback(tx);
txn->engine_tx = NULL;
- if (!txn->is_autocommit)
- trigger_clear(&txn->fiber_on_stop);
+ trigger_clear(&txn->fiber_on_stop);
}
static int
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index fabb4bb48..9d5f18e32 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
tuple_unref(delete);
- struct txn *txn = txn_begin_stmt(deferred_delete_space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, deferred_delete_space) == NULL)
return -1;
struct tuple *unused;
if (space_execute_dml(deferred_delete_space, txn,
&request, &unused) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
return txn_commit_stmt(txn, &request);
diff --git a/src/box/wal.c b/src/box/wal.c
index 0ea15a432..628e7529a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -303,7 +303,6 @@ tx_schedule_rollback(struct cmsg *msg)
* in-memory database state.
*/
stailq_reverse(&writer->rollback);
- /* Must not yield. */
tx_schedule_queue(&writer->rollback);
stailq_create(&writer->rollback);
if (msg != &writer->in_rollback)
--
2.21.0
More information about the Tarantool-patches
mailing list