[tarantool-patches] [PATCH v5 2/7] txn: get rid of autocommit from a txn structure

Georgy Kirichenko georgy at tarantool.org
Sat Jun 22 00:48:16 MSK 2019


Move transaction auto start and auto commit behavior to the box level.
>From now a transaction won't start and commit automatically without
txn_begin/txn_commit invocations. This is a part of a bigger transaction
refactoring in order to implement detachable transactions and a parallel
applier.

Prerequisites: #1254
---
 src/box/applier.cc     | 43 ++++++++++++++++++++++----
 src/box/box.cc         | 70 +++++++++++++++++++++++++++---------------
 src/box/index.cc       | 10 +++---
 src/box/memtx_engine.c | 10 ++++--
 src/box/memtx_space.c  |  6 ++--
 src/box/sql.c          |  2 +-
 src/box/txn.c          | 52 +++++++++++--------------------
 src/box/txn.h          | 16 +++-------
 src/box/vy_scheduler.c |  6 ++--
 9 files changed, 126 insertions(+), 89 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 373e1feb9..d12a835d0 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -172,11 +172,26 @@ applier_writer_f(va_list ap)
 static int
 apply_initial_join_row(struct xrow_header *row)
 {
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
 	struct request request;
 	xrow_decode_dml(row, &request, dml_request_key_map(row->type));
-	struct space *space = space_cache_find_xc(request.space_id);
+	struct space *space = space_cache_find(request.space_id);
+	if (space == NULL)
+		goto rollback;
 	/* no access checks here - applier always works with admin privs */
-	return space_apply_initial_join_row(space, &request);
+	if (space_apply_initial_join_row(space, &request))
+		goto rollback;
+	int rc;
+	rc = txn_commit(txn);
+	if (rc < 0)
+		return -1;
+	fiber_gc();
+	return rc;
+rollback:
+	txn_rollback();
+	return -1;
 }
 
 /**
@@ -189,8 +204,8 @@ static int
 process_nop(struct request *request)
 {
 	assert(request->type == IPROTO_NOP);
-	struct txn *txn = txn_begin_stmt(NULL);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, NULL) != 0)
 		return -1;
 	return txn_commit_stmt(txn, request);
 }
@@ -213,6 +228,22 @@ apply_row(struct xrow_header *row)
 	return 0;
 }
 
+static int
+apply_final_join_row(struct xrow_header *row)
+{
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
+	if (apply_row(row) != 0) {
+		txn_rollback();
+		return -1;
+	}
+	if (txn_commit(txn) != 0)
+		return -1;
+	fiber_gc();
+	return 0;
+}
+
 /**
  * Connect to a remote host and authenticate the client.
  */
@@ -403,7 +434,7 @@ applier_join(struct applier *applier)
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
-			if (apply_row(&row) != 0)
+			if (apply_final_join_row(&row) != 0)
 				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
@@ -555,7 +586,7 @@ applier_apply_tx(struct stailq *rows)
 	 * conflict safely access failed xrow object and allocate
 	 * IPROTO_NOP on gc.
 	 */
-	struct txn *txn = txn_begin(false);
+	struct txn *txn = txn_begin();
 	struct applier_tx_row *item;
 	if (txn == NULL)
 		diag_raise();
diff --git a/src/box/box.cc b/src/box/box.cc
index 57419ee01..a32f1ba0f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -169,34 +169,56 @@ int
 box_process_rw(struct request *request, struct space *space,
 	       struct tuple **result)
 {
+	struct tuple *tuple = NULL;
+	bool return_tuple = false;
+	struct txn *txn = in_txn();
+	bool is_autocommit = txn == NULL;
+	if (is_autocommit && (txn = txn_begin()) == NULL)
+		return -1;
 	assert(iproto_type_is_dml(request->type));
 	rmean_collect(rmean_box, request->type, 1);
 	if (access_check_space(space, PRIV_W) != 0)
-		return -1;
-	struct txn *txn = txn_begin_stmt(space);
-	if (txn == NULL)
-		return -1;
-	struct tuple *tuple;
+		goto rollback;
+	if (txn_begin_stmt(txn, space) != 0)
+		goto rollback;
 	if (space_execute_dml(space, txn, request, &tuple) != 0) {
-		txn_rollback_stmt();
-		return -1;
+		txn_rollback_stmt(txn);
+		goto rollback;
 	}
-	if (result == NULL)
-		return txn_commit_stmt(txn, request);
-	*result = tuple;
-	if (tuple == NULL)
-		return txn_commit_stmt(txn, request);
-	/*
-	 * Pin the tuple locally before the commit,
-	 * otherwise it may go away during yield in
-	 * when WAL is written in autocommit mode.
-	 */
-	tuple_ref(tuple);
-	int rc = txn_commit_stmt(txn, request);
-	if (rc == 0)
+	if (result != NULL)
+		*result = tuple;
+
+	return_tuple = result != NULL && tuple != NULL;
+	if (return_tuple) {
+		/*
+		 * Pin the tuple locally before the commit,
+		 * otherwise it may go away during yield in
+		 * when WAL is written in autocommit mode.
+		 */
+		tuple_ref(tuple);
+	}
+
+	if (txn_commit_stmt(txn, request))
+		goto rollback;
+
+	if (is_autocommit) {
+		if (txn_commit(txn) != 0)
+			goto error;
+	        fiber_gc();
+	}
+	if (return_tuple) {
 		tuple_bless(tuple);
-	tuple_unref(tuple);
-	return rc;
+		tuple_unref(tuple);
+	}
+	return 0;
+
+rollback:
+	if (is_autocommit)
+		txn_rollback();
+error:
+	if (return_tuple)
+		tuple_unref(tuple);
+	return -1;
 }
 
 void
@@ -1055,7 +1077,7 @@ box_select(uint32_t space_id, uint32_t index_id,
 	struct iterator *it = index_create_iterator(index, type,
 						    key, part_count);
 	if (it == NULL) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 
@@ -1080,7 +1102,7 @@ box_select(uint32_t space_id, uint32_t index_id,
 
 	if (rc != 0) {
 		port_destroy(port);
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
diff --git a/src/box/index.cc b/src/box/index.cc
index 4a444e5d0..7f26c9bc2 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -240,7 +240,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_get(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -274,7 +274,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_min(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -306,7 +306,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
 	if (txn_begin_ro_stmt(space, &txn) != 0)
 		return -1;
 	if (index_max(index, key, part_count, result) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -340,7 +340,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type,
 		return -1;
 	ssize_t count = index_count(index, itype, key, part_count);
 	if (count < 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	txn_commit_ro_stmt(txn);
@@ -377,7 +377,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
 	struct iterator *it = index_create_iterator(index, itype,
 						    key, part_count);
 	if (it == NULL) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return NULL;
 	}
 	txn_commit_ro_stmt(txn);
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index cd763e547..dae9955b2 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -269,16 +269,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 		diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
 		return -1;
 	}
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
 	/* no access checks here - applier always works with admin privs */
-	if (space_apply_initial_join_row(space, &request) != 0)
+	if (space_apply_initial_join_row(space, &request) != 0) {
+		txn_rollback();
 		return -1;
+	}
+	int rc = txn_commit(txn);
 	/*
 	 * Don't let gc pool grow too much. Yet to
 	 * it before reading the next row, to make
 	 * sure it's not freed along here.
 	 */
 	fiber_gc();
-	return 0;
+	return rc;
 }
 
 /** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 78a0059a0..77a8f7d5f 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -325,8 +325,8 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
 		return -1;
 	}
 	request->header->replica_id = 0;
-	struct txn *txn = txn_begin_stmt(space);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, space) != 0)
 		return -1;
 	struct txn_stmt *stmt = txn_current_stmt(txn);
 	stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
@@ -341,7 +341,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request)
 
 rollback:
 	say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return -1;
 }
 
diff --git a/src/box/sql.c b/src/box/sql.c
index a0350da6b..4c9a4c15b 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -862,7 +862,7 @@ cursor_seek(BtCursor *pCur, int *pRes)
 				      part_count);
 	if (it == NULL) {
 		if (txn != NULL)
-			txn_rollback_stmt();
+			txn_rollback_stmt(txn);
 		pCur->eState = CURSOR_INVALID;
 		return -1;
 	}
diff --git a/src/box/txn.c b/src/box/txn.c
index 9aa460f50..583ae6ce0 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -178,7 +178,7 @@ txn_free(struct txn *txn)
 }
 
 struct txn *
-txn_begin(bool is_autocommit)
+txn_begin()
 {
 	static int64_t tsn = 0;
 	assert(! in_txn());
@@ -191,7 +191,6 @@ txn_begin(bool is_autocommit)
 	txn->n_new_rows = 0;
 	txn->n_local_rows = 0;
 	txn->n_applier_rows = 0;
-	txn->is_autocommit = is_autocommit;
 	txn->has_triggers  = false;
 	txn->is_aborted = false;
 	txn->in_sub_stmt = 0;
@@ -226,26 +225,20 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn)
 	return 0;
 }
 
-struct txn *
-txn_begin_stmt(struct space *space)
+int
+txn_begin_stmt(struct txn *txn, struct space *space)
 {
-	struct txn *txn = in_txn();
-	if (txn == NULL) {
-		txn = txn_begin(true);
-		if (txn == NULL)
-			return NULL;
-	} else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
+	assert(txn == in_txn());
+	assert(txn != NULL);
+	if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) {
 		diag_set(ClientError, ER_SUB_STMT_MAX);
-		return NULL;
+		return -1;
 	}
 	struct txn_stmt *stmt = txn_stmt_new(txn);
-	if (stmt == NULL) {
-		if (txn->is_autocommit && txn->in_sub_stmt == 0)
-			txn_rollback();
-		return NULL;
-	}
+	if (stmt == NULL)
+		return -1;
 	if (space == NULL)
-		return txn;
+		return 0;
 
 	if (trigger_run(&space->on_stmt_begin, txn) != 0)
 		goto fail;
@@ -258,10 +251,10 @@ txn_begin_stmt(struct space *space)
 	if (engine_begin_statement(engine, txn) != 0)
 		goto fail;
 
-	return txn;
+	return 0;
 fail:
-	txn_rollback_stmt();
-	return NULL;
+	txn_rollback_stmt(txn);
+	return -1;
 }
 
 bool
@@ -278,8 +271,7 @@ txn_is_distributed(struct txn *txn)
 }
 
 /**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * End a statement.
  */
 int
 txn_commit_stmt(struct txn *txn, struct request *request)
@@ -339,14 +331,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 			goto fail;
 	}
 	--txn->in_sub_stmt;
-	if (txn->is_autocommit && txn->in_sub_stmt == 0) {
-		int rc = txn_commit(txn);
-		fiber_gc();
-		return rc;
-	}
 	return 0;
 fail:
-	txn_rollback_stmt();
+	txn_rollback_stmt(txn);
 	return -1;
 }
 
@@ -382,7 +369,7 @@ txn_write_to_wal(struct txn *txn)
 
 	if (res < 0) {
 		/* Cascading rollback. */
-		txn_rollback(); /* Perform our part of cascading rollback. */
+		txn_rollback(txn); /* Perform our part of cascading rollback. */
 		/*
 		 * Move fiber to end of event loop to avoid
 		 * execution of any new requests before all
@@ -461,14 +448,11 @@ fail:
 }
 
 void
-txn_rollback_stmt()
+txn_rollback_stmt(struct txn *txn)
 {
-	struct txn *txn = in_txn();
 	if (txn == NULL || txn->in_sub_stmt == 0)
 		return;
 	txn->in_sub_stmt--;
-	if (txn->is_autocommit && txn->in_sub_stmt == 0)
-		return txn_rollback();
 	txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]);
 }
 
@@ -536,7 +520,7 @@ box_txn_begin()
 		diag_set(ClientError, ER_ACTIVE_TRANSACTION);
 		return -1;
 	}
-	if (txn_begin(false) == NULL)
+	if (txn_begin() == NULL)
 		return -1;
 	return 0;
 }
diff --git a/src/box/txn.h b/src/box/txn.h
index f4d861824..33926f6f3 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -162,11 +162,6 @@ struct txn {
 	 * already assigned LSN.
 	 */
 	int n_applier_rows;
-	/**
-	 * True if this transaction is running in autocommit mode
-	 * (statement end causes an automatic transaction commit).
-	 */
-	bool is_autocommit;
 	/**
 	 * True if the transaction was aborted so should be
 	 * rolled back at commit.
@@ -214,7 +209,7 @@ in_txn()
  * @pre no transaction is active
  */
 struct txn *
-txn_begin(bool is_autocommit);
+txn_begin();
 
 /**
  * Commit a transaction.
@@ -271,11 +266,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger)
 }
 
 /**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a new statement.
  */
-struct txn *
-txn_begin_stmt(struct space *space);
+int
+txn_begin_stmt(struct txn *txn, struct space *space);
 
 int
 txn_begin_in_engine(struct engine *engine, struct txn *txn);
@@ -334,7 +328,7 @@ txn_commit_stmt(struct txn *txn, struct request *request);
  * rolls back the entire transaction.
  */
 void
-txn_rollback_stmt();
+txn_rollback_stmt(struct txn *txn);
 
 /**
  * Raise an error if this is a multi-statement transaction: DDL
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 85c1659b0..ed8f7dd86 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space,
 
 	tuple_unref(delete);
 
-	struct txn *txn = txn_begin_stmt(deferred_delete_space);
-	if (txn == NULL)
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, deferred_delete_space) != 0)
 		return -1;
 
 	struct tuple *unused;
 	if (space_execute_dml(deferred_delete_space, txn,
 			      &request, &unused) != 0) {
-		txn_rollback_stmt();
+		txn_rollback_stmt(txn);
 		return -1;
 	}
 	return txn_commit_stmt(txn, &request);
-- 
2.22.0





More information about the Tarantool-patches mailing list