Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov <vdavydov.dev@gmail.com>
To: kostja@tarantool.org
Cc: tarantool-patches@freelists.org
Subject: [PATCH 4/6] txn: unify txn_stmt tuples reference counting rules
Date: Fri, 20 Jul 2018 20:43:33 +0300	[thread overview]
Message-ID: <1c2808bca96fc0c463c812d1f26a17ba6fa78c39.1532107326.git.vdavydov.dev@gmail.com> (raw)
In-Reply-To: <cover.1532107326.git.vdavydov.dev@gmail.com>
In-Reply-To: <cover.1532107326.git.vdavydov.dev@gmail.com>

Currently, the way txn_stmt::old_tuple and new_tuple are referenced
depends on the engine. For vinyl, the rules are straightforward: if
txn_stmt::{old_tuple,new_tuple} is not NULL, then the reference to the
corresponding tuple is elevated. Hence when a transaction is committed
or rolled back, vinyl calls tuple_unref on both txn_stmt::old_tuple and
new_tuple. For memtx, things are different: the engine doesn't
explicitly increment the reference counter of the tuples - it simply
sets them to the newly inserted tuple and the replaced tuple. On commit,
the reference counter of the old tuple is decreased to delete the
replaced tuple, while on rollback the reference counter of the new tuple
is decreased to delete the new tuple.

Because of this, we can't implement the blackhole engine (aka /dev/null)
without implementing commit and rollback engine methods - even though
such an engine doesn't store anything it still has to set the new_tuple
for on_replace trigger and hence it is responsible for releasing it on
commit or rollback. Since commit/rollback are rather inappropriate for
this kind of engine, let's instead unify txn_stmt reference counting
rules and make txn.c unreference the tuples no matter what engine is.
This doesn't change vinyl, because it already conforms. For memtx, this
means that we need to increase the reference counter when we insert a
new tuple into a space - not a big deal as tuple_ref is almost free.
---
 src/box/memtx_engine.c | 30 ++++++++--------------------
 src/box/memtx_space.c  | 53 ++++++++++++++++++++++++--------------------------
 src/box/txn.c          | 19 ++++++++++++++++++
 src/box/vinyl.c        | 25 ++++--------------------
 4 files changed, 56 insertions(+), 71 deletions(-)

diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index a1ce4fff..f5ace926 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -436,8 +436,9 @@ memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
 
 	/* Only roll back the changes if they were made. */
 	if (stmt->engine_savepoint == NULL)
-		index_count = 0;
-	else if (memtx_space->replace == memtx_space_replace_all_keys)
+		return;
+
+	if (memtx_space->replace == memtx_space_replace_all_keys)
 		index_count = space->index_count;
 	else if (memtx_space->replace == memtx_space_replace_primary_key)
 		index_count = 1;
@@ -455,16 +456,12 @@ memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
 			panic("failed to rollback change");
 		}
 	}
-	/** Reset to old bsize, if it was changed. */
-	if (stmt->engine_savepoint != NULL)
-		memtx_space_update_bsize(space, stmt->new_tuple,
-					 stmt->old_tuple);
 
-	if (stmt->new_tuple)
+	memtx_space_update_bsize(space, stmt->new_tuple, stmt->old_tuple);
+	if (stmt->old_tuple != NULL)
+		tuple_ref(stmt->old_tuple);
+	if (stmt->new_tuple != NULL)
 		tuple_unref(stmt->new_tuple);
-
-	stmt->old_tuple = NULL;
-	stmt->new_tuple = NULL;
 }
 
 static void
@@ -480,17 +477,6 @@ memtx_engine_rollback(struct engine *engine, struct txn *txn)
 		memtx_engine_rollback_statement(engine, txn, stmt);
 }
 
-static void
-memtx_engine_commit(struct engine *engine, struct txn *txn)
-{
-	(void)engine;
-	struct txn_stmt *stmt;
-	stailq_foreach_entry(stmt, &txn->stmts, next) {
-		if (stmt->old_tuple)
-			tuple_unref(stmt->old_tuple);
-	}
-}
-
 static int
 memtx_engine_bootstrap(struct engine *engine)
 {
@@ -973,7 +959,7 @@ static const struct engine_vtab memtx_engine_vtab = {
 	/* .begin = */ memtx_engine_begin,
 	/* .begin_statement = */ memtx_engine_begin_statement,
 	/* .prepare = */ memtx_engine_prepare,
-	/* .commit = */ memtx_engine_commit,
+	/* .commit = */ generic_engine_commit,
 	/* .rollback_statement = */ memtx_engine_rollback_statement,
 	/* .rollback = */ memtx_engine_rollback,
 	/* .bootstrap = */ memtx_engine_bootstrap,
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 26d70482..08ae0daa 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -127,6 +127,7 @@ memtx_space_replace_build_next(struct space *space, struct tuple *old_tuple,
 	if (index_build_next(space->index[0], new_tuple) != 0)
 		return -1;
 	memtx_space_update_bsize(space, NULL, new_tuple);
+	tuple_ref(new_tuple);
 	return 0;
 }
 
@@ -144,6 +145,8 @@ memtx_space_replace_primary_key(struct space *space, struct tuple *old_tuple,
 			  new_tuple, mode, &old_tuple) != 0)
 		return -1;
 	memtx_space_update_bsize(space, old_tuple, new_tuple);
+	if (new_tuple != NULL)
+		tuple_ref(new_tuple);
 	*result = old_tuple;
 	return 0;
 }
@@ -273,6 +276,8 @@ memtx_space_replace_all_keys(struct space *space, struct tuple *old_tuple,
 	}
 
 	memtx_space_update_bsize(space, old_tuple, new_tuple);
+	if (new_tuple != NULL)
+		tuple_ref(new_tuple);
 	*result = old_tuple;
 	return 0;
 
@@ -338,11 +343,9 @@ memtx_space_execute_replace(struct space *space, struct txn *txn,
 	if (stmt->new_tuple == NULL)
 		return -1;
 	tuple_ref(stmt->new_tuple);
-	struct tuple *old_tuple;
-	if (memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple,
-				 mode, &old_tuple) != 0)
+	if (memtx_space->replace(space, NULL, stmt->new_tuple,
+				 mode, &stmt->old_tuple) != 0)
 		return -1;
-	stmt->old_tuple = old_tuple;
 	stmt->engine_savepoint = stmt;
 	/** The new tuple is referenced by the primary key. */
 	*result = stmt->new_tuple;
@@ -363,14 +366,13 @@ memtx_space_execute_delete(struct space *space, struct txn *txn,
 	uint32_t part_count = mp_decode_array(&key);
 	if (exact_key_validate(pk->def->key_def, key, part_count) != 0)
 		return -1;
-	if (index_get(pk, key, part_count, &stmt->old_tuple) != 0)
+	struct tuple *old_tuple;
+	if (index_get(pk, key, part_count, &old_tuple) != 0)
 		return -1;
-	struct tuple *old_tuple = NULL;
-	if (stmt->old_tuple != NULL &&
-	    memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple,
-				 DUP_REPLACE_OR_INSERT, &old_tuple) != 0)
+	if (old_tuple != NULL &&
+	    memtx_space->replace(space, old_tuple, NULL,
+				 DUP_REPLACE_OR_INSERT, &stmt->old_tuple) != 0)
 		return -1;
-	stmt->old_tuple = old_tuple;
 	stmt->engine_savepoint = stmt;
 	*result = stmt->old_tuple;
 	return 0;
@@ -390,17 +392,18 @@ memtx_space_execute_update(struct space *space, struct txn *txn,
 	uint32_t part_count = mp_decode_array(&key);
 	if (exact_key_validate(pk->def->key_def, key, part_count) != 0)
 		return -1;
-	if (index_get(pk, key, part_count, &stmt->old_tuple) != 0)
+	struct tuple *old_tuple;
+	if (index_get(pk, key, part_count, &old_tuple) != 0)
 		return -1;
 
-	if (stmt->old_tuple == NULL) {
+	if (old_tuple == NULL) {
 		*result = NULL;
 		return 0;
 	}
 
 	/* Update the tuple; legacy, request ops are in request->tuple */
 	uint32_t new_size = 0, bsize;
-	const char *old_data = tuple_data_range(stmt->old_tuple, &bsize);
+	const char *old_data = tuple_data_range(old_tuple, &bsize);
 	const char *new_data =
 		tuple_update_execute(region_aligned_alloc_cb, &fiber()->gc,
 				     request->tuple, request->tuple_end,
@@ -414,12 +417,9 @@ memtx_space_execute_update(struct space *space, struct txn *txn,
 	if (stmt->new_tuple == NULL)
 		return -1;
 	tuple_ref(stmt->new_tuple);
-	struct tuple *old_tuple = NULL;
-	if (stmt->old_tuple != NULL &&
-	    memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple,
-				 DUP_REPLACE, &old_tuple) != 0)
+	if (memtx_space->replace(space, old_tuple, stmt->new_tuple,
+				 DUP_REPLACE, &stmt->old_tuple) != 0)
 		return -1;
-	stmt->old_tuple = old_tuple;
 	stmt->engine_savepoint = stmt;
 	*result = stmt->new_tuple;
 	return 0;
@@ -453,10 +453,11 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
 	mp_decode_array(&key);
 
 	/* Try to find the tuple by primary key. */
-	if (index_get(index, key, part_count, &stmt->old_tuple) != 0)
+	struct tuple *old_tuple;
+	if (index_get(index, key, part_count, &old_tuple) != 0)
 		return -1;
 
-	if (stmt->old_tuple == NULL) {
+	if (old_tuple == NULL) {
 		/**
 		 * Old tuple was not found. A write optimized
 		 * engine may only know this after commit, so
@@ -486,8 +487,7 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
 		tuple_ref(stmt->new_tuple);
 	} else {
 		uint32_t new_size = 0, bsize;
-		const char *old_data = tuple_data_range(stmt->old_tuple,
-							&bsize);
+		const char *old_data = tuple_data_range(old_tuple, &bsize);
 		/*
 		 * Update the tuple.
 		 * tuple_upsert_execute() fails on totally wrong
@@ -514,14 +514,13 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
 		struct index *pk = space->index[0];
 		if (!key_update_can_be_skipped(pk->def->key_def->column_mask,
 					       column_mask) &&
-		    tuple_compare(stmt->old_tuple, stmt->new_tuple,
+		    tuple_compare(old_tuple, stmt->new_tuple,
 				  pk->def->key_def) != 0) {
 			/* Primary key is changed: log error and do nothing. */
 			diag_set(ClientError, ER_CANT_UPDATE_PRIMARY_KEY,
 				 pk->def->name, space_name(space));
 			diag_log();
 			tuple_unref(stmt->new_tuple);
-			stmt->old_tuple = NULL;
 			stmt->new_tuple = NULL;
 		}
 	}
@@ -531,12 +530,10 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
 	 * we checked this case explicitly and skipped the upsert
 	 * above.
 	 */
-	struct tuple *old_tuple = NULL;
 	if (stmt->new_tuple != NULL &&
-	    memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple,
-				 DUP_REPLACE_OR_INSERT, &old_tuple) != 0)
+	    memtx_space->replace(space, old_tuple, stmt->new_tuple,
+				 DUP_REPLACE_OR_INSERT, &stmt->old_tuple) != 0)
 		return -1;
-	stmt->old_tuple = old_tuple;
 	stmt->engine_savepoint = stmt;
 	/* Return nothing: UPSERT does not return data. */
 	return 0;
diff --git a/src/box/txn.c b/src/box/txn.c
index cb301015..fbce612a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -99,6 +99,15 @@ txn_stmt_new(struct txn *txn)
 	return stmt;
 }
 
+static inline void
+txn_stmt_unref_tuples(struct txn_stmt *stmt)
+{
+	if (stmt->old_tuple != NULL)
+		tuple_unref(stmt->old_tuple);
+	if (stmt->new_tuple != NULL)
+		tuple_unref(stmt->new_tuple);
+}
+
 static void
 txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
 {
@@ -116,6 +125,7 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
 			txn->n_rows--;
 			stmt->row = NULL;
 		}
+		txn_stmt_unref_tuples(stmt);
 	}
 }
 
@@ -325,6 +335,10 @@ txn_commit(struct txn *txn)
 	if (txn->engine != NULL)
 		engine_commit(txn->engine, txn);
 
+	struct txn_stmt *stmt;
+	stailq_foreach_entry(stmt, &txn->stmts, next)
+		txn_stmt_unref_tuples(stmt);
+
 	TRASH(txn);
 	/** Free volatile txn memory. */
 	fiber_gc();
@@ -362,6 +376,11 @@ txn_rollback()
 	}
 	if (txn->engine)
 		engine_rollback(txn->engine, txn);
+
+	struct txn_stmt *stmt;
+	stailq_foreach_entry(stmt, &txn->stmts, next)
+		txn_stmt_unref_tuples(stmt);
+
 	TRASH(txn);
 	/** Free volatile txn memory. */
 	fiber_gc();
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index f02fa638..374c5252 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2324,17 +2324,6 @@ vinyl_space_execute_upsert(struct space *space, struct txn *txn,
 	return vy_upsert(env, tx, stmt, space, request);
 }
 
-static inline void
-txn_stmt_unref_tuples(struct txn_stmt *stmt)
-{
-	if (stmt->old_tuple)
-		tuple_unref(stmt->old_tuple);
-	if (stmt->new_tuple)
-		tuple_unref(stmt->new_tuple);
-	stmt->old_tuple = NULL;
-	stmt->new_tuple = NULL;
-}
-
 static int
 vinyl_engine_begin(struct engine *engine, struct txn *txn)
 {
@@ -2442,11 +2431,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
 	/* We can't abort the transaction at this point, use force. */
 	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
 
-	struct txn_stmt *stmt;
-	stailq_foreach_entry(stmt, &txn->stmts, next)
-		txn_stmt_unref_tuples(stmt);
 	txn->engine_tx = NULL;
-
 	if (!txn->is_autocommit)
 		trigger_clear(&txn->fiber_on_stop);
 }
@@ -2461,11 +2446,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn)
 
 	vy_tx_rollback(tx);
 
-	struct txn_stmt *stmt;
-	stailq_foreach_entry(stmt, &txn->stmts, next)
-		txn_stmt_unref_tuples(stmt);
 	txn->engine_tx = NULL;
-
 	if (!txn->is_autocommit)
 		trigger_clear(&txn->fiber_on_stop);
 }
@@ -2489,7 +2470,6 @@ vinyl_engine_rollback_statement(struct engine *engine, struct txn *txn,
 	struct vy_tx *tx = txn->engine_tx;
 	assert(tx != NULL);
 	vy_tx_rollback_to_savepoint(tx, stmt->engine_savepoint);
-	txn_stmt_unref_tuples(stmt);
 }
 
 /* }}} Public API of transaction control */
@@ -3290,7 +3270,10 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
 	else
 		vy_tx_rollback(tx);
 
-	txn_stmt_unref_tuples(&stmt);
+	if (stmt.old_tuple != NULL)
+		tuple_unref(stmt.old_tuple);
+	if (stmt.new_tuple != NULL)
+		tuple_unref(stmt.new_tuple);
 
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-- 
2.11.0

  parent reply	other threads:[~2018-07-20 17:43 UTC|newest]

Thread overview: 24+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-07-20 17:43 [PATCH 0/6] Introduce blackhole engine Vladimir Davydov
2018-07-20 17:43 ` [PATCH 1/6] Add generic engine, space, index method stubs Vladimir Davydov
2018-07-20 17:56   ` Konstantin Osipov
2018-07-21 12:24   ` Vladimir Davydov
2018-07-20 17:43 ` [PATCH 2/6] Merge sysview_index.[hc] and sysview_engine.[hc] Vladimir Davydov
2018-07-20 18:02   ` Konstantin Osipov
2018-07-21 12:24   ` Vladimir Davydov
2018-07-20 17:43 ` [PATCH 3/6] Rework memtx replace function Vladimir Davydov
2018-07-20 18:04   ` Konstantin Osipov
2018-07-21 12:24   ` Vladimir Davydov
2018-07-20 17:43 ` Vladimir Davydov [this message]
2018-07-20 18:31   ` [PATCH 4/6] txn: unify txn_stmt tuples reference counting rules Konstantin Osipov
2018-07-21 12:24   ` Vladimir Davydov
2018-07-20 17:43 ` [PATCH 5/6] space: call before_replace trigger even if space has no indexes Vladimir Davydov
2018-07-20 18:39   ` Konstantin Osipov
2018-07-21 12:26     ` Vladimir Davydov
2018-07-21 20:59       ` Konstantin Osipov
2018-07-23 10:15         ` Vladimir Davydov
2018-07-20 17:43 ` [PATCH 6/6] Introduce blackhole engine Vladimir Davydov
2018-07-20 18:42   ` Konstantin Osipov
2018-07-21 12:35     ` Vladimir Davydov
2018-07-23 11:02       ` Vladimir Davydov
2018-07-23 12:53         ` Vladimir Davydov
2018-07-23 18:30 ` [PATCH 0/6] " Konstantin Osipov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1c2808bca96fc0c463c812d1f26a17ba6fa78c39.1532107326.git.vdavydov.dev@gmail.com \
    --to=vdavydov.dev@gmail.com \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [PATCH 4/6] txn: unify txn_stmt tuples reference counting rules' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox