[Tarantool-patches] [PATCH 2/3] txn: split complete into success and fail paths

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Oct 31 21:01:41 MSK 2020


txn_complete used to handle all the transaction outcomes:
- manual rollback;
- error at WAL write;
- successful WAL write and commit;
- successful WAL write and wait for synchronization with replicas.

The code became a mess after synchronous replication was
introduced. This patch splits txn_complete's code into multiple
pieces.

Now WAL write success and fail are handled by
txn_on_journal_write() exclusively. It also runs the WAL write
triggers. It was very strange to call them from txn_complete().

txn_on_journal_write() also checks if the transaction is
synchronous, and if it is not, it completes it with
txn_complete_success() whose code is simple now, and only works
on committing the changes.

In case of fail the transaction always ends up in
txn_complete_fail().

These success and fail functions are now used by the limbo as
well. It appeared all the places finishing a transaction always
know if they want to fail it or complete successfully.

This should also remove a few ifs from the hot code of transaction
commit.

The patch simplifies the code in order to fix the false warning
about too long WAL write for synchronous transactions, which is
printed not at WAL write now, but at commit. These two events are
far from each other for synchro requests.

Part of #5139
---
 src/box/txn.c       | 122 +++++++++++++++++++++-----------------------
 src/box/txn.h       |  13 ++++-
 src/box/txn_limbo.c |   8 +--
 3 files changed, 72 insertions(+), 71 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 8aedcc9e4..fd77a934a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -506,70 +506,13 @@ txn_run_wal_write_triggers(struct txn *txn)
 }
 
 /**
- * Complete transaction processing.
+ * If there is no fiber waiting for the transaction then the
+ * transaction could be safely freed. In the opposite case the
+ * owner fiber is in duty to free this transaction.
  */
-void
-txn_complete(struct txn *txn)
+static void
+txn_free_or_wakeup(struct txn *txn)
 {
-	assert(!txn_has_flag(txn, TXN_IS_DONE));
-	/*
-	 * Note, engine can be NULL if transaction contains
-	 * IPROTO_NOP or IPROTO_CONFIRM statements.
-	 */
-	if (txn->signature < 0) {
-		txn->status = TXN_ABORTED;
-		/* Undo the transaction. */
-		struct txn_stmt *stmt;
-		stailq_reverse(&txn->stmts);
-		stailq_foreach_entry(stmt, &txn->stmts, next)
-			txn_rollback_one_stmt(txn, stmt);
-		if (txn->engine)
-			engine_rollback(txn->engine, txn);
-		if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
-			txn_run_rollback_triggers(txn, &txn->on_rollback);
-	} else if (!txn_has_flag(txn, TXN_WAIT_SYNC)) {
-		txn->status = TXN_COMMITTED;
-		/* Commit the transaction. */
-		if (txn->engine != NULL)
-			engine_commit(txn->engine, txn);
-		if (txn_has_flag(txn, TXN_HAS_TRIGGERS)) {
-			txn_run_commit_triggers(txn, &txn->on_commit);
-			/*
-			 * For async transactions WAL write ==
-			 * commit.
-			 */
-			txn_run_wal_write_triggers(txn);
-		}
-
-		double stop_tm = ev_monotonic_now(loop());
-		if (stop_tm - txn->start_tm > too_long_threshold) {
-			int n_rows = txn->n_new_rows + txn->n_applier_rows;
-			say_warn_ratelimited("too long WAL write: %d rows at "
-					     "LSN %lld: %.3f sec", n_rows,
-					     txn->signature - n_rows + 1,
-					     stop_tm - txn->start_tm);
-		}
-	} else {
-		if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
-			txn_run_wal_write_triggers(txn);
-		/*
-		 * Complete is called on every WAL operation
-		 * authored by this transaction. And it not always
-		 * is one. And not always is enough for commit.
-		 * In case the transaction is waiting for acks, it
-		 * can't be committed right away. Give control
-		 * back to the fiber, owning the transaction so as
-		 * it could decide what to do next.
-		 */
-		if (txn->fiber != NULL && txn->fiber != fiber())
-			fiber_wakeup(txn->fiber);
-		return;
-	}
-	/*
-	 * If there is no fiber waiting for the transaction then
-	 * the transaction could be safely freed. In the opposite case
-	 * the fiber is in duty to free this transaction.
-	 */
 	if (txn->fiber == NULL)
 		txn_free(txn);
 	else {
@@ -580,6 +523,45 @@ txn_complete(struct txn *txn)
 	}
 }
 
+void
+txn_complete_fail(struct txn *txn)
+{
+	assert(!txn_has_flag(txn, TXN_IS_DONE));
+	assert(txn->signature < 0);
+	txn->status = TXN_ABORTED;
+	struct txn_stmt *stmt;
+	stailq_reverse(&txn->stmts);
+	stailq_foreach_entry(stmt, &txn->stmts, next)
+		txn_rollback_one_stmt(txn, stmt);
+	if (txn->engine)
+		engine_rollback(txn->engine, txn);
+	if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
+		txn_run_rollback_triggers(txn, &txn->on_rollback);
+	txn_free_or_wakeup(txn);
+}
+
+void
+txn_complete_success(struct txn *txn)
+{
+	double stop_tm = ev_monotonic_now(loop());
+	if (stop_tm - txn->start_tm > too_long_threshold) {
+		int n_rows = txn->n_new_rows + txn->n_applier_rows;
+		say_warn_ratelimited("too long WAL write: %d rows at LSN %lld: "
+				     "%.3f sec", n_rows,
+				     txn->signature - n_rows + 1,
+				     stop_tm - txn->start_tm);
+	}
+	assert(!txn_has_flag(txn, TXN_IS_DONE));
+	assert(!txn_has_flag(txn, TXN_WAIT_SYNC));
+	assert(txn->signature >= 0);
+	txn->status = TXN_COMMITTED;
+	if (txn->engine != NULL)
+		engine_commit(txn->engine, txn);
+	if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
+		txn_run_commit_triggers(txn, &txn->on_commit);
+	txn_free_or_wakeup(txn);
+}
+
 /** Callback invoked when the transaction's journal write is finished. */
 static void
 txn_on_journal_write(struct journal_entry *entry)
@@ -601,7 +583,17 @@ txn_on_journal_write(struct journal_entry *entry)
 	 */
 	assert(in_txn() == NULL);
 	fiber_set_txn(fiber(), txn);
-	txn_complete(txn);
+	if (txn->signature < 0) {
+		txn_complete_fail(txn);
+		goto finish;
+	}
+	if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
+		txn_run_wal_write_triggers(txn);
+	if (!txn_has_flag(txn, TXN_WAIT_SYNC))
+		txn_complete_success(txn);
+	else if (txn->fiber != NULL && txn->fiber != fiber())
+		fiber_wakeup(txn->fiber);
+finish:
 	fiber_set_txn(fiber(), NULL);
 }
 
@@ -772,7 +764,7 @@ txn_commit_nop(struct txn *txn)
 {
 	if (txn->n_new_rows + txn->n_applier_rows == 0) {
 		txn->signature = TXN_SIGNATURE_NOP;
-		txn_complete(txn);
+		txn_complete_success(txn);
 		fiber_set_txn(fiber(), NULL);
 		return true;
 	}
@@ -982,7 +974,7 @@ txn_rollback(struct txn *txn)
 	if (!txn_has_flag(txn, TXN_CAN_YIELD))
 		trigger_clear(&txn->fiber_on_yield);
 	txn->signature = TXN_SIGNATURE_ROLLBACK;
-	txn_complete(txn);
+	txn_complete_fail(txn);
 	fiber_set_txn(fiber(), NULL);
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index ffe240867..dfa7c0ee4 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -419,10 +419,19 @@ struct txn *
 txn_begin(void);
 
 /**
- * Complete transaction processing.
+ * Complete transaction processing with an error. All the changes are going to
+ * be rolled back. The transaction's signature should be set to the rollback
+ * reason.
  */
 void
-txn_complete(struct txn *txn);
+txn_complete_fail(struct txn *txn);
+
+/**
+ * Complete transaction processing successfully. All the changes are going to
+ * become committed and visible.
+ */
+void
+txn_complete_success(struct txn *txn);
 
 /**
  * Commit a transaction.
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 06a3d30c3..c94678885 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -240,7 +240,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 		txn_limbo_abort(limbo, e);
 		txn_clear_flag(e->txn, TXN_WAIT_SYNC);
 		txn_clear_flag(e->txn, TXN_WAIT_ACK);
-		txn_complete(e->txn);
+		txn_complete_fail(e->txn);
 		if (e == entry)
 			break;
 		fiber_wakeup(e->txn->fiber);
@@ -386,7 +386,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 		 * branch won't exist.
 		 */
 		if (e->txn->signature >= 0)
-			txn_complete(e->txn);
+			txn_complete_success(e->txn);
 	}
 }
 
@@ -428,7 +428,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
 		if (e->txn->signature >= 0) {
 			/* Rollback the transaction. */
 			e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
-			txn_complete(e->txn);
+			txn_complete_fail(e->txn);
 		} else {
 			/*
 			 * Rollback the transaction, but don't free it yet. It
@@ -446,7 +446,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
 			e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
 			struct fiber *fiber = e->txn->fiber;
 			e->txn->fiber = fiber();
-			txn_complete(e->txn);
+			txn_complete_fail(e->txn);
 			e->txn->fiber = fiber;
 		}
 		if (e == last_rollback)
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list