[tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt
Georgy Kirichenko
georgy at tarantool.org
Fri Apr 19 15:44:02 MSK 2019
Pass a txn structure to tnx_begin_stmt and txn_rollback_stmt functions.
Prerequisites: #1254
---
src/box/applier.cc | 4 ++--
src/box/box.cc | 8 ++++----
src/box/index.cc | 10 +++++-----
src/box/journal.c | 1 +
src/box/journal.h | 16 +++++++++++++++
src/box/memtx_space.c | 8 ++++----
src/box/sql.c | 2 +-
src/box/txn.c | 45 ++++++++++++++++++++++++------------------
src/box/txn.h | 9 ++++-----
src/box/vy_scheduler.c | 6 +++---
src/box/wal.c | 5 ++++-
11 files changed, 70 insertions(+), 44 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 42b6efc4d..43b1a84bc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -201,8 +201,8 @@ static int
process_nop(struct request *request)
{
assert(request->type == IPROTO_NOP);
- struct txn *txn = txn_begin_stmt(NULL);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, NULL) == NULL)
return -1;
return txn_commit_stmt(txn, request);
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 835a00c95..46cd444fd 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -178,10 +178,10 @@ box_process_rw(struct request *request, struct space *space,
rmean_collect(rmean_box, request->type, 1);
if (access_check_space(space, PRIV_W) != 0)
goto fail;
- if (txn_begin_stmt(space) == NULL)
+ if (txn_begin_stmt(txn, space) == NULL)
goto fail;
if (space_execute_dml(space, txn, request, &tuple) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
goto fail;
}
/*
@@ -1087,7 +1087,7 @@ box_select(uint32_t space_id, uint32_t index_id,
struct iterator *it = index_create_iterator(index, type,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
@@ -1112,7 +1112,7 @@ box_select(uint32_t space_id, uint32_t index_id,
if (rc != 0) {
port_destroy(port);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
diff --git a/src/box/index.cc b/src/box/index.cc
index 2817d076d..baee0c1cb 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -239,7 +239,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_get(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -273,7 +273,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_min(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -305,7 +305,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
if (txn_begin_ro_stmt(space, &txn) != 0)
return -1;
if (index_max(index, key, part_count, result) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -339,7 +339,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
return -1;
ssize_t count = index_count(index, itype, key, part_count);
if (count < 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
txn_commit_ro_stmt(txn);
@@ -376,7 +376,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
struct iterator *it = index_create_iterator(index, itype,
key, part_count);
if (it == NULL) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
txn_commit_ro_stmt(txn);
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..5cffc7452 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -70,6 +70,7 @@ journal_entry_new(size_t n_rows, struct region *region)
entry->n_rows = n_rows;
entry->res = -1;
entry->fiber = fiber();
+ rlist_create(&entry->on_error);
return entry;
}
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..94be0b007 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -32,6 +32,7 @@
*/
#include <stdint.h>
#include <stdbool.h>
+#include "trigger.h"
#include "salad/stailq.h"
#if defined(__cplusplus)
@@ -58,6 +59,12 @@ struct journal_entry {
* The fiber issuing the request.
*/
struct fiber *fiber;
+ /**
+ * A trigger list to call if write failed. Triggers are going to be
+ * fired before any other processing and are a good place to implement
+ * any rollback non-yielding behavior.
+ */
+ struct rlist on_error;
/**
* Approximate size of this request when encoded.
*/
@@ -82,6 +89,15 @@ struct region;
struct journal_entry *
journal_entry_new(size_t n_rows, struct region *region);
+/**
+ * Add an on_error trigger to a journal entry.
+ */
+static inline void
+journal_entry_on_error(struct journal_entry *entry, struct trigger *trigger)
+{
+ trigger_add(&entry->on_error, trigger);
+}
+
/**
* An API for an abstract journal for all transactions of this
* instance, as well as for multiple instances in case of
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index a28204d30..f2b28fb71 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
return -1;
}
request->header->replica_id = 0;
- struct txn *txn = txn_begin_stmt(space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ struct txn_stmt *stmt = txn_begin_stmt(txn, space);
+ if (stmt == NULL)
return -1;
- struct txn_stmt *stmt = txn_current_stmt(txn);
stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
request->tuple_end);
if (stmt->new_tuple == NULL)
@@ -326,7 +326,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
rollback:
say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
diff --git a/src/box/sql.c b/src/box/sql.c
index 1fb93e106..0051b93c0 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -911,7 +911,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
part_count);
if (it == NULL) {
if (txn != NULL)
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
pCur->eState = CURSOR_INVALID;
return SQL_TARANTOOL_ITERATOR_FAIL;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index fdfffa144..8f5b66480 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -218,14 +218,10 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
return 0;
}
-struct txn *
-txn_begin_stmt(struct space *space)
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space)
{
- struct txn *txn = in_txn();
- if (txn == NULL) {
- diag_set(ClientError, ER_NO_TRANSACTION);
- return NULL;
- } else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+ if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
diag_set(ClientError, ER_SUB_STMT_MAX);
return NULL;
}
@@ -233,7 +229,7 @@ txn_begin_stmt(struct space *space)
if (stmt == NULL)
return NULL;
if (space == NULL)
- return txn;
+ return stmt;
if (trigger_run(&space->on_stmt_begin, txn) != 0)
goto fail;
@@ -246,9 +242,9 @@ txn_begin_stmt(struct space *space)
if (engine_begin_statement(engine, txn) != 0)
goto fail;
- return txn;
+ return stmt;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return NULL;
}
@@ -311,10 +307,23 @@ txn_commit_stmt(struct txn *txn, struct request *request)
--txn->in_sub_stmt;
return 0;
fail:
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
+/*
+ * Callback called if journal write failed.
+ */
+static void
+journal_write_error_cb(struct trigger *trigger, void *event)
+{
+ (void) event;
+ struct txn *txn = (struct txn *)trigger->data;
+ if (txn->engine)
+ engine_rollback(txn->engine, txn);
+ txn->engine = NULL;
+}
+
static int64_t
txn_write_to_wal(struct txn *txn)
{
@@ -326,6 +335,10 @@ txn_write_to_wal(struct txn *txn)
if (req == NULL)
return -1;
+ struct trigger on_error;
+ trigger_create(&on_error, journal_write_error_cb, txn, NULL);
+ journal_entry_on_error(req, &on_error);
+
struct txn_stmt *stmt;
struct xrow_header **remote_row = req->rows;
struct xrow_header **local_row = req->rows + txn->n_applier_rows;
@@ -348,12 +361,6 @@ txn_write_to_wal(struct txn *txn)
if (res < 0) {
/* Cascading rollback. */
txn_rollback(txn); /* Perform our part of cascading rollback. */
- /*
- * Move fiber to end of event loop to avoid
- * execution of any new requests before all
- * pending rollbacks are processed.
- */
- fiber_reschedule();
diag_set(ClientError, ER_WAL_IO);
diag_log();
} else if (stop - start > too_long_threshold) {
@@ -429,9 +436,8 @@ fail:
}
void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
{
- struct txn *txn = in_txn();
if (txn == NULL || txn->in_sub_stmt == 0)
return;
txn->in_sub_stmt--;
@@ -449,6 +455,7 @@ txn_rollback(struct txn *txn)
unreachable();
panic("rollback trigger failed");
}
+
if (txn->engine)
engine_rollback(txn->engine, txn);
diff --git a/src/box/txn.h b/src/box/txn.h
index ae2d7b9a5..96719638b 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -261,11 +261,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
}
/**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
*/
-struct txn *
-txn_begin_stmt(struct space *space);
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space);
int
txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -324,7 +323,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
* rolls back the entire transaction.
*/
void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
/**
* Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 1f6b30f4a..053558c8c 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
tuple_unref(delete);
- struct txn *txn = txn_begin_stmt(deferred_delete_space);
- if (txn == NULL)
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, deferred_delete_space) == NULL)
return -1;
struct tuple *unused;
if (space_execute_dml(deferred_delete_space, txn,
&request, &unused) != 0) {
- txn_rollback_stmt();
+ txn_rollback_stmt(txn);
return -1;
}
return txn_commit_stmt(txn, &request);
diff --git a/src/box/wal.c b/src/box/wal.c
index ad8ff7c62..8dfa3ef27 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -300,7 +300,10 @@ tx_schedule_rollback(struct cmsg *msg)
* in-memory database state.
*/
stailq_reverse(&writer->rollback);
- /* Must not yield. */
+ /* Call error callback for each request before any scheduling. */
+ struct journal_entry *req;
+ stailq_foreach_entry(req, &writer->rollback, fifo)
+ trigger_run(&req->on_error, NULL);
tx_schedule_queue(&writer->rollback);
stailq_create(&writer->rollback);
}
--
2.21.0
More information about the Tarantool-patches
mailing list