[PATCH 4/6] txn: unify txn_stmt tuples reference counting rules

Vladimir Davydov vdavydov.dev at gmail.com
Fri Jul 20 20:43:33 MSK 2018


Currently, the way txn_stmt::old_tuple and new_tuple are referenced
depends on the engine. For vinyl, the rules are straightforward: if
txn_stmt::{old_tuple,new_tuple} is not NULL, then the reference to the
corresponding tuple is elevated. Hence when a transaction is committed
or rolled back, vinyl calls tuple_unref on both txn_stmt::old_tuple and
new_tuple. For memtx, things are different: the engine doesn't
explicitly increment the reference counter of the tuples - it simply
sets them to the newly inserted tuple and the replaced tuple. On commit,
the reference counter of the old tuple is decreased to delete the
replaced tuple, while on rollback the reference counter of the new tuple
is decreased to delete the new tuple.

Because of this, we can't implement the blackhole engine (aka /dev/null)
without implementing commit and rollback engine methods - even though
such an engine doesn't store anything it still has to set the new_tuple
for on_replace trigger and hence it is responsible for releasing it on
commit or rollback. Since commit/rollback are rather inappropriate for
this kind of engine, let's instead unify txn_stmt reference counting
rules and make txn.c unreference the tuples no matter what engine is.
This doesn't change vinyl, because it already conforms. For memtx, this
means that we need to increase the reference counter when we insert a
new tuple into a space - not a big deal as tuple_ref is almost free.
---
 src/box/memtx_engine.c | 30 ++++++++--------------------
 src/box/memtx_space.c  | 53 ++++++++++++++++++++++++--------------------------
 src/box/txn.c          | 19 ++++++++++++++++++
 src/box/vinyl.c        | 25 ++++--------------------
 4 files changed, 56 insertions(+), 71 deletions(-)

diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index a1ce4fff..f5ace926 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -436,8 +436,9 @@ memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
 
 	/* Only roll back the changes if they were made. */
 	if (stmt->engine_savepoint == NULL)
-		index_count = 0;
-	else if (memtx_space->replace == memtx_space_replace_all_keys)
+		return;
+
+	if (memtx_space->replace == memtx_space_replace_all_keys)
 		index_count = space->index_count;
 	else if (memtx_space->replace == memtx_space_replace_primary_key)
 		index_count = 1;
@@ -455,16 +456,12 @@ memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
 			panic("failed to rollback change");
 		}
 	}
-	/** Reset to old bsize, if it was changed. */
-	if (stmt->engine_savepoint != NULL)
-		memtx_space_update_bsize(space, stmt->new_tuple,
-					 stmt->old_tuple);
 
-	if (stmt->new_tuple)
+	memtx_space_update_bsize(space, stmt->new_tuple, stmt->old_tuple);
+	if (stmt->old_tuple != NULL)
+		tuple_ref(stmt->old_tuple);
+	if (stmt->new_tuple != NULL)
 		tuple_unref(stmt->new_tuple);
-
-	stmt->old_tuple = NULL;
-	stmt->new_tuple = NULL;
 }
 
 static void
@@ -480,17 +477,6 @@ memtx_engine_rollback(struct engine *engine, struct txn *txn)
 		memtx_engine_rollback_statement(engine, txn, stmt);
 }
 
-static void
-memtx_engine_commit(struct engine *engine, struct txn *txn)
-{
-	(void)engine;
-	struct txn_stmt *stmt;
-	stailq_foreach_entry(stmt, &txn->stmts, next) {
-		if (stmt->old_tuple)
-			tuple_unref(stmt->old_tuple);
-	}
-}
-
 static int
 memtx_engine_bootstrap(struct engine *engine)
 {
@@ -973,7 +959,7 @@ static const struct engine_vtab memtx_engine_vtab = {
 	/* .begin = */ memtx_engine_begin,
 	/* .begin_statement = */ memtx_engine_begin_statement,
 	/* .prepare = */ memtx_engine_prepare,
-	/* .commit = */ memtx_engine_commit,
+	/* .commit = */ generic_engine_commit,
 	/* .rollback_statement = */ memtx_engine_rollback_statement,
 	/* .rollback = */ memtx_engine_rollback,
 	/* .bootstrap = */ memtx_engine_bootstrap,
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 26d70482..08ae0daa 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -127,6 +127,7 @@ memtx_space_replace_build_next(struct space *space, struct tuple *old_tuple,
 	if (index_build_next(space->index[0], new_tuple) != 0)
 		return -1;
 	memtx_space_update_bsize(space, NULL, new_tuple);
+	tuple_ref(new_tuple);
 	return 0;
 }
 
@@ -144,6 +145,8 @@ memtx_space_replace_primary_key(struct space *space, struct tuple *old_tuple,
 			  new_tuple, mode, &old_tuple) != 0)
 		return -1;
 	memtx_space_update_bsize(space, old_tuple, new_tuple);
+	if (new_tuple != NULL)
+		tuple_ref(new_tuple);
 	*result = old_tuple;
 	return 0;
 }
@@ -273,6 +276,8 @@ memtx_space_replace_all_keys(struct space *space, struct tuple *old_tuple,
 	}
 
 	memtx_space_update_bsize(space, old_tuple, new_tuple);
+	if (new_tuple != NULL)
+		tuple_ref(new_tuple);
 	*result = old_tuple;
 	return 0;
 
@@ -338,11 +343,9 @@ memtx_space_execute_replace(struct space *space, struct txn *txn,
 	if (stmt->new_tuple == NULL)
 		return -1;
 	tuple_ref(stmt->new_tuple);
-	struct tuple *old_tuple;
-	if (memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple,
-				 mode, &old_tuple) != 0)
+	if (memtx_space->replace(space, NULL, stmt->new_tuple,
+				 mode, &stmt->old_tuple) != 0)
 		return -1;
-	stmt->old_tuple = old_tuple;
 	stmt->engine_savepoint = stmt;
 	/** The new tuple is referenced by the primary key. */
 	*result = stmt->new_tuple;
@@ -363,14 +366,13 @@ memtx_space_execute_delete(struct space *space, struct txn *txn,
 	uint32_t part_count = mp_decode_array(&key);
 	if (exact_key_validate(pk->def->key_def, key, part_count) != 0)
 		return -1;
-	if (index_get(pk, key, part_count, &stmt->old_tuple) != 0)
+	struct tuple *old_tuple;
+	if (index_get(pk, key, part_count, &old_tuple) != 0)
 		return -1;
-	struct tuple *old_tuple = NULL;
-	if (stmt->old_tuple != NULL &&
-	    memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple,
-				 DUP_REPLACE_OR_INSERT, &old_tuple) != 0)
+	if (old_tuple != NULL &&
+	    memtx_space->replace(space, old_tuple, NULL,
+				 DUP_REPLACE_OR_INSERT, &stmt->old_tuple) != 0)
 		return -1;
-	stmt->old_tuple = old_tuple;
 	stmt->engine_savepoint = stmt;
 	*result = stmt->old_tuple;
 	return 0;
@@ -390,17 +392,18 @@ memtx_space_execute_update(struct space *space, struct txn *txn,
 	uint32_t part_count = mp_decode_array(&key);
 	if (exact_key_validate(pk->def->key_def, key, part_count) != 0)
 		return -1;
-	if (index_get(pk, key, part_count, &stmt->old_tuple) != 0)
+	struct tuple *old_tuple;
+	if (index_get(pk, key, part_count, &old_tuple) != 0)
 		return -1;
 
-	if (stmt->old_tuple == NULL) {
+	if (old_tuple == NULL) {
 		*result = NULL;
 		return 0;
 	}
 
 	/* Update the tuple; legacy, request ops are in request->tuple */
 	uint32_t new_size = 0, bsize;
-	const char *old_data = tuple_data_range(stmt->old_tuple, &bsize);
+	const char *old_data = tuple_data_range(old_tuple, &bsize);
 	const char *new_data =
 		tuple_update_execute(region_aligned_alloc_cb, &fiber()->gc,
 				     request->tuple, request->tuple_end,
@@ -414,12 +417,9 @@ memtx_space_execute_update(struct space *space, struct txn *txn,
 	if (stmt->new_tuple == NULL)
 		return -1;
 	tuple_ref(stmt->new_tuple);
-	struct tuple *old_tuple = NULL;
-	if (stmt->old_tuple != NULL &&
-	    memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple,
-				 DUP_REPLACE, &old_tuple) != 0)
+	if (memtx_space->replace(space, old_tuple, stmt->new_tuple,
+				 DUP_REPLACE, &stmt->old_tuple) != 0)
 		return -1;
-	stmt->old_tuple = old_tuple;
 	stmt->engine_savepoint = stmt;
 	*result = stmt->new_tuple;
 	return 0;
@@ -453,10 +453,11 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
 	mp_decode_array(&key);
 
 	/* Try to find the tuple by primary key. */
-	if (index_get(index, key, part_count, &stmt->old_tuple) != 0)
+	struct tuple *old_tuple;
+	if (index_get(index, key, part_count, &old_tuple) != 0)
 		return -1;
 
-	if (stmt->old_tuple == NULL) {
+	if (old_tuple == NULL) {
 		/**
 		 * Old tuple was not found. A write optimized
 		 * engine may only know this after commit, so
@@ -486,8 +487,7 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
 		tuple_ref(stmt->new_tuple);
 	} else {
 		uint32_t new_size = 0, bsize;
-		const char *old_data = tuple_data_range(stmt->old_tuple,
-							&bsize);
+		const char *old_data = tuple_data_range(old_tuple, &bsize);
 		/*
 		 * Update the tuple.
 		 * tuple_upsert_execute() fails on totally wrong
@@ -514,14 +514,13 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
 		struct index *pk = space->index[0];
 		if (!key_update_can_be_skipped(pk->def->key_def->column_mask,
 					       column_mask) &&
-		    tuple_compare(stmt->old_tuple, stmt->new_tuple,
+		    tuple_compare(old_tuple, stmt->new_tuple,
 				  pk->def->key_def) != 0) {
 			/* Primary key is changed: log error and do nothing. */
 			diag_set(ClientError, ER_CANT_UPDATE_PRIMARY_KEY,
 				 pk->def->name, space_name(space));
 			diag_log();
 			tuple_unref(stmt->new_tuple);
-			stmt->old_tuple = NULL;
 			stmt->new_tuple = NULL;
 		}
 	}
@@ -531,12 +530,10 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
 	 * we checked this case explicitly and skipped the upsert
 	 * above.
 	 */
-	struct tuple *old_tuple = NULL;
 	if (stmt->new_tuple != NULL &&
-	    memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple,
-				 DUP_REPLACE_OR_INSERT, &old_tuple) != 0)
+	    memtx_space->replace(space, old_tuple, stmt->new_tuple,
+				 DUP_REPLACE_OR_INSERT, &stmt->old_tuple) != 0)
 		return -1;
-	stmt->old_tuple = old_tuple;
 	stmt->engine_savepoint = stmt;
 	/* Return nothing: UPSERT does not return data. */
 	return 0;
diff --git a/src/box/txn.c b/src/box/txn.c
index cb301015..fbce612a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -99,6 +99,15 @@ txn_stmt_new(struct txn *txn)
 	return stmt;
 }
 
+static inline void
+txn_stmt_unref_tuples(struct txn_stmt *stmt)
+{
+	if (stmt->old_tuple != NULL)
+		tuple_unref(stmt->old_tuple);
+	if (stmt->new_tuple != NULL)
+		tuple_unref(stmt->new_tuple);
+}
+
 static void
 txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
 {
@@ -116,6 +125,7 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
 			txn->n_rows--;
 			stmt->row = NULL;
 		}
+		txn_stmt_unref_tuples(stmt);
 	}
 }
 
@@ -325,6 +335,10 @@ txn_commit(struct txn *txn)
 	if (txn->engine != NULL)
 		engine_commit(txn->engine, txn);
 
+	struct txn_stmt *stmt;
+	stailq_foreach_entry(stmt, &txn->stmts, next)
+		txn_stmt_unref_tuples(stmt);
+
 	TRASH(txn);
 	/** Free volatile txn memory. */
 	fiber_gc();
@@ -362,6 +376,11 @@ txn_rollback()
 	}
 	if (txn->engine)
 		engine_rollback(txn->engine, txn);
+
+	struct txn_stmt *stmt;
+	stailq_foreach_entry(stmt, &txn->stmts, next)
+		txn_stmt_unref_tuples(stmt);
+
 	TRASH(txn);
 	/** Free volatile txn memory. */
 	fiber_gc();
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index f02fa638..374c5252 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2324,17 +2324,6 @@ vinyl_space_execute_upsert(struct space *space, struct txn *txn,
 	return vy_upsert(env, tx, stmt, space, request);
 }
 
-static inline void
-txn_stmt_unref_tuples(struct txn_stmt *stmt)
-{
-	if (stmt->old_tuple)
-		tuple_unref(stmt->old_tuple);
-	if (stmt->new_tuple)
-		tuple_unref(stmt->new_tuple);
-	stmt->old_tuple = NULL;
-	stmt->new_tuple = NULL;
-}
-
 static int
 vinyl_engine_begin(struct engine *engine, struct txn *txn)
 {
@@ -2442,11 +2431,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
 	/* We can't abort the transaction at this point, use force. */
 	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
 
-	struct txn_stmt *stmt;
-	stailq_foreach_entry(stmt, &txn->stmts, next)
-		txn_stmt_unref_tuples(stmt);
 	txn->engine_tx = NULL;
-
 	if (!txn->is_autocommit)
 		trigger_clear(&txn->fiber_on_stop);
 }
@@ -2461,11 +2446,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
 
 	vy_tx_rollback(tx);
 
-	struct txn_stmt *stmt;
-	stailq_foreach_entry(stmt, &txn->stmts, next)
-		txn_stmt_unref_tuples(stmt);
 	txn->engine_tx = NULL;
-
 	if (!txn->is_autocommit)
 		trigger_clear(&txn->fiber_on_stop);
 }
@@ -2489,7 +2470,6 @@ vinyl_engine_rollback_statement(struct engine *engine, struct txn *txn,
 	struct vy_tx *tx = txn->engine_tx;
 	assert(tx != NULL);
 	vy_tx_rollback_to_savepoint(tx, stmt->engine_savepoint);
-	txn_stmt_unref_tuples(stmt);
 }
 
 /* }}} Public API of transaction control */
@@ -3290,7 +3270,10 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
 	else
 		vy_tx_rollback(tx);
 
-	txn_stmt_unref_tuples(&stmt);
+	if (stmt.old_tuple != NULL)
+		tuple_unref(stmt.old_tuple);
+	if (stmt.new_tuple != NULL)
+		tuple_unref(stmt.new_tuple);
 
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-- 
2.11.0




More information about the Tarantool-patches mailing list