[PATCH v2 1/4] txn: do not require space id for nop requests

Vladimir Davydov vdavydov.dev at gmail.com
Mon Jul 9 18:40:09 MSK 2018


Currently, IPROTO_NOP can only be generated by a before_replace trigger,
when it returns the old tuple thus turning the original operation into a
NOP. In such a case we know the space id and we write it to the request
body. This allows us to dispatch NOP requests via DML route.

As a part of replica local spaces feature, we will substitute requests
operating on local spaces with NOP in relay in order to promote vclock
on replicas without actual data modification. Since space_id is stored
in request body, sending it to replicas would mean decoding the request
body in relay, which is an overkill. To avoid that, let's separate NOP
and DML paths and remove space_id from NOP requests.

Needed for #3443
---
 src/box/box.cc                   | 21 +++++++++++
 src/box/iproto_constants.c       |  2 +-
 src/box/request.c                |  7 ++--
 src/box/txn.c                    | 78 ++++++++++++++++++++--------------------
 test/box/before_replace.result   |  4 +--
 test/box/before_replace.test.lua |  2 +-
 test/box/on_replace.result       |  4 +--
 7 files changed, 72 insertions(+), 46 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ba0af95e..00a025b2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -192,6 +192,22 @@ process_rw(struct request *request, struct space *space, struct tuple **result)
 	return rc;
 }
 
+/**
+ * Process a no-op request.
+ *
+ * A no-op request does not affect any space, but it
+ * promotes vclock and is written to WAL.
+ */
+static int
+process_nop(struct request *request)
+{
+	assert(request->type == IPROTO_NOP);
+	struct txn *txn = txn_begin_stmt(NULL);
+	if (txn == NULL)
+		return -1;
+	return txn_commit_stmt(txn, request);
+}
+
 void
 box_set_ro(bool ro)
 {
@@ -279,6 +295,11 @@ apply_row(struct xstream *stream, struct xrow_header *row)
 	(void) stream;
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+	if (request.type == IPROTO_NOP) {
+		if (process_nop(&request) != 0)
+			diag_raise();
+		return;
+	}
 	struct space *space = space_cache_find_xc(request.space_id);
 	if (process_rw(&request, space, NULL) != 0) {
 		say_error("error applying row: %s", request_str(&request));
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 3adb7cd4..5c1d3a31 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -121,7 +121,7 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = {
 	bit(SPACE_ID) | bit(OPS) | bit(TUPLE),                 /* UPSERT */
 	0,                                                     /* CALL */
 	0,                                                     /* reserved */
-	bit(SPACE_ID),                                         /* NOP */
+	0,                                                     /* NOP */
 };
 #undef bit
 
diff --git a/src/box/request.c b/src/box/request.c
index fda54a18..8690519c 100644
--- a/src/box/request.c
+++ b/src/box/request.c
@@ -50,7 +50,6 @@ request_create_from_tuple(struct request *request, struct space *space,
 			  struct tuple *old_tuple, struct tuple *new_tuple)
 {
 	memset(request, 0, sizeof(*request));
-	request->space_id = space->def->id;
 
 	if (old_tuple == new_tuple) {
 		/*
@@ -60,7 +59,11 @@ request_create_from_tuple(struct request *request, struct space *space,
 		request->type = IPROTO_NOP;
 		return 0;
 	}
-
+	/*
+	 * Space pointer may be zero in case of NOP, in which case
+	 * this line is not reached.
+	 */
+	request->space_id = space->def->id;
 	if (new_tuple == NULL) {
 		uint32_t size, key_size;
 		const char *data = tuple_data_range(old_tuple, &size);
diff --git a/src/box/txn.c b/src/box/txn.c
index b5ad39b0..80e5463d 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -106,13 +106,15 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
 	stailq_cut_tail(&txn->stmts, svp, &rollback);
 	stailq_reverse(&rollback);
 	stailq_foreach_entry(stmt, &rollback, next) {
-		engine_rollback_statement(txn->engine, txn, stmt);
+		if (stmt->space != NULL) {
+			engine_rollback_statement(txn->engine, txn, stmt);
+			stmt->space = NULL;
+		}
 		if (stmt->row != NULL) {
 			assert(txn->n_rows > 0);
 			txn->n_rows--;
 			stmt->row = NULL;
 		}
-		stmt->space = NULL;
 	}
 }
 
@@ -146,7 +148,6 @@ int
 txn_begin_in_engine(struct engine *engine, struct txn *txn)
 {
 	if (txn->engine == NULL) {
-		assert(stailq_empty(&txn->stmts));
 		txn->engine = engine;
 		return engine_begin(engine, txn);
 	} else if (txn->engine != engine) {
@@ -173,6 +174,15 @@ txn_begin_stmt(struct space *space)
 		return NULL;
 	}
 
+	struct txn_stmt *stmt = txn_stmt_new(txn);
+	if (stmt == NULL) {
+		if (txn->is_autocommit && txn->in_sub_stmt == 0)
+			txn_rollback();
+		return NULL;
+	}
+	if (space == NULL)
+		return txn;
+
 	if (trigger_run(&space->on_stmt_begin, txn) != 0)
 		goto fail;
 
@@ -180,19 +190,13 @@ txn_begin_stmt(struct space *space)
 	if (txn_begin_in_engine(engine, txn) != 0)
 		goto fail;
 
-	struct txn_stmt *stmt = txn_stmt_new(txn);
-	if (stmt == NULL)
-		goto fail;
 	stmt->space = space;
+	if (engine_begin_statement(engine, txn) != 0)
+		goto fail;
 
-	if (engine_begin_statement(engine, txn) != 0) {
-		txn_rollback_stmt();
-		return NULL;
-	}
 	return txn;
 fail:
-	if (txn->is_autocommit && txn->in_sub_stmt == 0)
-		txn_rollback();
+	txn_rollback_stmt();
 	return NULL;
 }
 
@@ -211,7 +215,7 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	struct txn_stmt *stmt = txn_current_stmt(txn);
 
 	/* Create WAL record for the write requests in non-temporary spaces */
-	if (!space_is_temporary(stmt->space)) {
+	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
 		if (txn_add_redo(stmt, request) != 0)
 			goto fail;
 		++txn->n_rows;
@@ -225,7 +229,7 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	 * - perhaps we should run triggers even for deletes which
 	 *   doesn't find any rows
 	 */
-	if (!rlist_empty(&stmt->space->on_replace) &&
+	if (stmt->space != NULL && !rlist_empty(&stmt->space->on_replace) &&
 	    stmt->space->run_triggers && (stmt->old_tuple || stmt->new_tuple)) {
 		if (trigger_run(&stmt->space->on_replace, txn) != 0)
 			goto fail;
@@ -239,7 +243,6 @@ fail:
 	return -1;
 }
 
-
 static int64_t
 txn_write_to_wal(struct txn *txn)
 {
@@ -288,32 +291,31 @@ txn_commit(struct txn *txn)
 {
 	assert(txn == in_txn());
 
-	assert(stailq_empty(&txn->stmts) || txn->engine);
-
 	/* Do transaction conflict resolving */
-	if (txn->engine) {
-		if (engine_prepare(txn->engine, txn) != 0)
-			goto fail;
+	if (txn->engine != NULL &&
+	    engine_prepare(txn->engine, txn) != 0)
+		goto fail;
 
-		if (txn->n_rows > 0) {
-			txn->signature = txn_write_to_wal(txn);
-			if (txn->signature < 0)
-				goto fail;
-		}
-		/*
-		 * The transaction is in the binary log. No action below
-		 * may throw. In case an error has happened, there is
-		 * no other option but terminate.
-		 */
-		if (txn->has_triggers &&
-		    trigger_run(&txn->on_commit, txn) != 0) {
-			diag_log();
-			unreachable();
-			panic("commit trigger failed");
-		}
+	if (txn->n_rows > 0) {
+		txn->signature = txn_write_to_wal(txn);
+		if (txn->signature < 0)
+			goto fail;
+	}
+	/*
+	 * The transaction is in the binary log. No action below
+	 * may throw. In case an error has happened, there is
+	 * no other option but terminate.
+	 */
+	if (txn->has_triggers &&
+	    trigger_run(&txn->on_commit, txn) != 0) {
+		diag_log();
+		unreachable();
+		panic("commit trigger failed");
+	}
 
+	if (txn->engine != NULL)
 		engine_commit(txn->engine, txn);
-	}
+
 	TRASH(txn);
 	/** Free volatile txn memory. */
 	fiber_gc();
@@ -478,7 +480,7 @@ box_txn_rollback_to_savepoint(box_txn_savepoint_t *svp)
 	}
 	struct txn_stmt *stmt = svp->stmt == NULL ? NULL :
 			stailq_entry(svp->stmt, struct txn_stmt, next);
-	if (stmt != NULL && stmt->space == NULL) {
+	if (stmt != NULL && stmt->space == NULL && stmt->row == NULL) {
 		/*
 		 * The statement at which this savepoint was
 		 * created has been rolled back.
diff --git a/test/box/before_replace.result b/test/box/before_replace.result
index 4f47b9fa..2b6c1801 100644
--- a/test/box/before_replace.result
+++ b/test/box/before_replace.result
@@ -660,9 +660,9 @@ row.HEADER.type
 ---
 - NOP
 ...
-row.BODY.space_id == s.id
+#row.BODY
 ---
-- true
+- 0
 ...
 -- gh-3128 before_replace with run_triggers
 s2 = box.schema.space.create("test2")
diff --git a/test/box/before_replace.test.lua b/test/box/before_replace.test.lua
index 22733c1d..9b4f49cf 100644
--- a/test/box/before_replace.test.lua
+++ b/test/box/before_replace.test.lua
@@ -221,7 +221,7 @@ path = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.lsn -
 fun, param, state = xlog.pairs(path)
 state, row = fun(param, state)
 row.HEADER.type
-row.BODY.space_id == s.id
+#row.BODY
 
 -- gh-3128 before_replace with run_triggers
 s2 = box.schema.space.create("test2")
diff --git a/test/box/on_replace.result b/test/box/on_replace.result
index 8c52e128..fcdb4379 100644
--- a/test/box/on_replace.result
+++ b/test/box/on_replace.result
@@ -527,14 +527,14 @@ t = s:on_replace(function () s:rename('newname') end, t)
 ...
 s:replace({8, 9})
 ---
-- error: Space _space does not support multi-statement transactions
+- error: DDL does not support multi-statement transactions
 ...
 t = s:on_replace(function () s.index.pk:rename('newname') end, t)
 ---
 ...
 s:replace({9, 10})
 ---
-- error: Space _index does not support multi-statement transactions
+- error: DDL does not support multi-statement transactions
 ...
 s:select()
 ---
-- 
2.11.0




More information about the Tarantool-patches mailing list