[tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt

Georgy Kirichenko georgy at tarantool.org
Fri Apr 19 15:44:02 MSK 2019


Pass a txn structure to tnx_begin_stmt and txn_rollback_stmt functions.

Prerequisites: #1254
---
 src/box/applier.cc     |  4 ++--
 src/box/box.cc         |  8 ++++----
 src/box/index.cc       | 10 +++++-----
 src/box/journal.c      |  1 +
 src/box/journal.h      | 16 +++++++++++++++
 src/box/memtx_space.c  |  8 ++++----
 src/box/sql.c          |  2 +-
 src/box/txn.c          | 45 ++++++++++++++++++++++++------------------
 src/box/txn.h          |  9 ++++-----
 src/box/vy_scheduler.c |  6 +++---
 src/box/wal.c          |  5 ++++-
 11 files changed, 70 insertions(+), 44 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 42b6efc4d..43b1a84bc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -201,8 +201,8 @@ static int
 process_nop(struct request *request)
 {
 	assert(request->type == IPROTO_NOP);
-	struct txn *txn = txn_begin_stmt(NULL);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, NULL) == NULL)
 		return -1;
 	return txn_commit_stmt(txn, request);
 }
diff --git a/src/box/box.cc b/src/box/box.cc
index 835a00c95..46cd444fd 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -178,10 +178,10 @@ box_process_rw(struct request *request, struct space *space,
 	rmean_collect(rmean_box, request->type, 1);
 	if (access_check_space(space, PRIV_W) != 0)
 		goto fail;
-	if (txn_begin_stmt(space) == NULL)
+	if (txn_begin_stmt(txn, space) == NULL)
 		goto fail;
 	if (space_execute_dml(space, txn, request, &tuple) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		goto fail;
 	}
 	/*
@@ -1087,7 +1087,7 @@ box_select(uint32_t space_id, uint32_t index_id,
 	struct iterator *it = index_create_iterator(index, type,
 						    key, part_count);
 	if (it == NULL) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 
@@ -1112,7 +1112,7 @@ box_select(uint32_t space_id, uint32_t index_id,
 
 	if (rc != 0) {
 		port_destroy(port);
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
diff --git a/src/box/index.cc b/src/box/index.cc
index 2817d076d..baee0c1cb 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -239,7 +239,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_get(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -273,7 +273,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_min(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -305,7 +305,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_max(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -339,7 +339,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
 		return -1;
 	ssize_t count = index_count(index, itype, key, part_count);
 	if (count < 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -376,7 +376,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
 	struct iterator *it = index_create_iterator(index, itype,
 						    key, part_count);
 	if (it == NULL) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return NULL;
 	}
 	txn_commit_ro_stmt(txn);
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..5cffc7452 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -70,6 +70,7 @@ journal_entry_new(size_t n_rows, struct region *region)
 	entry->n_rows = n_rows;
 	entry->res = -1;
 	entry->fiber = fiber();
+	rlist_create(&entry->on_error);
 	return entry;
 }
 
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..94be0b007 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -32,6 +32,7 @@
  */
 #include <stdint.h>
 #include <stdbool.h>
+#include "trigger.h"
 #include "salad/stailq.h"
 
 #if defined(__cplusplus)
@@ -58,6 +59,12 @@ struct journal_entry {
 	 * The fiber issuing the request.
 	 */
 	struct fiber *fiber;
+	/**
+	 * A trigger list to call if write failed. Triggers are going to be
+	 * fired before any other processing and are a good place to implement
+	 * any rollback non-yielding behavior.
+	 */
+	struct rlist on_error;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -82,6 +89,15 @@ struct region;
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region);
 
+/**
+ * Add an on_error trigger to a journal entry.
+ */
+static inline void
+journal_entry_on_error(struct journal_entry *entry, struct trigger *trigger)
+{
+	trigger_add(&entry->on_error, trigger);
+}
+
 /**
  * An API for an abstract journal for all transactions of this
  * instance, as well as for multiple instances in case of
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index a28204d30..f2b28fb71 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
 		return -1;
 	}
 	request->header->replica_id = 0;
-	struct txn *txn = txn_begin_stmt(space);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	struct txn_stmt *stmt = txn_begin_stmt(txn, space);
+	if (stmt == NULL)
 		return -1;
-	struct txn_stmt *stmt = txn_current_stmt(txn);
 	stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
 					  request->tuple_end);
 	if (stmt->new_tuple == NULL)
@@ -326,7 +326,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
 
 rollback:
 	say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return -1;
 }
 
diff --git a/src/box/sql.c b/src/box/sql.c
index 1fb93e106..0051b93c0 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -911,7 +911,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
 				      part_count);
 	if (it == NULL) {
 		if (txn != NULL)
-			txn_rollback_stmt();
+			txn_rollback_stmt(txn);
 		pCur->eState = CURSOR_INVALID;
 		return SQL_TARANTOOL_ITERATOR_FAIL;
 	}
diff --git a/src/box/txn.c b/src/box/txn.c
index fdfffa144..8f5b66480 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -218,14 +218,10 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
 	return 0;
 }
 
-struct txn *
-txn_begin_stmt(struct space *space)
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space)
 {
-	struct txn *txn = in_txn();
-	if (txn == NULL) {
-		diag_set(ClientError, ER_NO_TRANSACTION);
-		return NULL;
-	} else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+	if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
 		diag_set(ClientError, ER_SUB_STMT_MAX);
 		return NULL;
 	}
@@ -233,7 +229,7 @@ txn_begin_stmt(struct space *space)
 	if (stmt == NULL)
 		return NULL;
 	if (space == NULL)
-		return txn;
+		return stmt;
 
 	if (trigger_run(&space->on_stmt_begin, txn) != 0)
 		goto fail;
@@ -246,9 +242,9 @@ txn_begin_stmt(struct space *space)
 	if (engine_begin_statement(engine, txn) != 0)
 		goto fail;
 
-	return txn;
+	return stmt;
 fail:
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return NULL;
 }
 
@@ -311,10 +307,23 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	--txn->in_sub_stmt;
 	return 0;
 fail:
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return -1;
 }
 
+/*
+ * Callback called if journal write failed.
+ */
+static void
+journal_write_error_cb(struct trigger *trigger, void *event)
+{
+	(void) event;
+	struct txn *txn = (struct txn *)trigger->data;
+	if (txn->engine)
+		engine_rollback(txn->engine, txn);
+	txn->engine = NULL;
+}
+
 static int64_t
 txn_write_to_wal(struct txn *txn)
 {
@@ -326,6 +335,10 @@ txn_write_to_wal(struct txn *txn)
 	if (req == NULL)
 		return -1;
 
+	struct trigger on_error;
+	trigger_create(&on_error, journal_write_error_cb, txn, NULL);
+	journal_entry_on_error(req, &on_error);
+
 	struct txn_stmt *stmt;
 	struct xrow_header **remote_row = req->rows;
 	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
@@ -348,12 +361,6 @@ txn_write_to_wal(struct txn *txn)
 	if (res < 0) {
 		/* Cascading rollback. */
 		txn_rollback(txn); /* Perform our part of cascading rollback. */
-		/*
-		 * Move fiber to end of event loop to avoid
-		 * execution of any new requests before all
-		 * pending rollbacks are processed.
-		 */
-		fiber_reschedule();
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
 	} else if (stop - start > too_long_threshold) {
@@ -429,9 +436,8 @@ fail:
 }
 
 void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
 {
-	struct txn *txn = in_txn();
 	if (txn == NULL || txn->in_sub_stmt == 0)
 		return;
 	txn->in_sub_stmt--;
@@ -449,6 +455,7 @@ txn_rollback(struct txn *txn)
 		unreachable();
 		panic("rollback trigger failed");
 	}
+
 	if (txn->engine)
 		engine_rollback(txn->engine, txn);
 
diff --git a/src/box/txn.h b/src/box/txn.h
index ae2d7b9a5..96719638b 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -261,11 +261,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
 }
 
 /**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
  */
-struct txn *
-txn_begin_stmt(struct space *space);
+struct txn_stmt *
+txn_begin_stmt(struct txn *txn, struct space *space);
 
 int
 txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -324,7 +323,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
  * rolls back the entire transaction.
  */
 void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
 
 /**
  * Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 1f6b30f4a..053558c8c 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
 
 	tuple_unref(delete);
 
-	struct txn *txn = txn_begin_stmt(deferred_delete_space);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, deferred_delete_space) == NULL)
 		return -1;
 
 	struct tuple *unused;
 	if (space_execute_dml(deferred_delete_space, txn,
 			      &request, &unused) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	return txn_commit_stmt(txn, &request);
diff --git a/src/box/wal.c b/src/box/wal.c
index ad8ff7c62..8dfa3ef27 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -300,7 +300,10 @@ tx_schedule_rollback(struct cmsg *msg)
 	 * in-memory database state.
 	 */
 	stailq_reverse(&writer->rollback);
-	/* Must not yield. */
+	/* Call error callback for each request before any scheduling. */
+	struct journal_entry *req;
+	stailq_foreach_entry(req, &writer->rollback, fifo)
+		trigger_run(&req->on_error, NULL);
 	tx_schedule_queue(&writer->rollback);
 	stailq_create(&writer->rollback);
 }
-- 
2.21.0





More information about the Tarantool-patches mailing list