From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 4/6] txn: do not require space id for nop requests Date: Wed, 13 Jun 2018 19:10:36 +0300 Message-Id: <4dfddcb3838385c88cbfdf51d52ef112f56cf130.1528906027.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: Currently, IPROTO_NOP can only be generated by a before_replace trigger, when it returns the old tuple thus turning the original operation into a NOP. In such a case we know the space id and we write it to the request body. This allows us to dispatch NOP requests via DML route. As a part of replica local spaces feature, we will substitute requests operating on local spaces with NOP in relay in order to promote vclock on replicas without actual data modification. Since space_id is stored in request body, sending it to replicas would mean decoding the request body in relay, which is an overkill. To avoid that, let's separate NOP and DML paths and remove space_id from NOP requests. Needed for #3443 --- src/box/box.cc | 5 ++ src/box/iproto_constants.c | 2 +- src/box/request.c | 2 +- src/box/txn.c | 99 ++++++++++++++++++++++++++-------------- src/box/txn.h | 12 +++++ test/box/before_replace.result | 4 +- test/box/before_replace.test.lua | 2 +- 7 files changed, 86 insertions(+), 40 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index c1d15644..8248cc77 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -277,6 +277,11 @@ apply_row(struct xstream *stream, struct xrow_header *row) (void) stream; struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); + if (request.type == IPROTO_NOP) { + if (txn_commit_nop(&request) != 0) + diag_raise(); + return; + } struct space *space = space_cache_find_xc(request.space_id); if (process_rw(&request, space, NULL) != 0) { say_error("error applying row: %s", request_str(&request)); diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 3adb7cd4..5c1d3a31 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -121,7 +121,7 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = { bit(SPACE_ID) | bit(OPS) | bit(TUPLE), /* UPSERT */ 0, /* CALL */ 0, /* reserved */ - bit(SPACE_ID), /* NOP */ + 0, /* NOP */ }; #undef bit diff --git a/src/box/request.c b/src/box/request.c index fda54a18..6633153c 100644 --- a/src/box/request.c +++ b/src/box/request.c @@ -50,7 +50,6 @@ request_create_from_tuple(struct request *request, struct space *space, struct tuple *old_tuple, struct tuple *new_tuple) { memset(request, 0, sizeof(*request)); - request->space_id = space->def->id; if (old_tuple == new_tuple) { /* @@ -61,6 +60,7 @@ request_create_from_tuple(struct request *request, struct space *space, return 0; } + request->space_id = space->def->id; if (new_tuple == NULL) { uint32_t size, key_size; const char *data = tuple_data_range(old_tuple, &size); diff --git a/src/box/txn.c b/src/box/txn.c index e25c0e0e..362030a9 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -71,9 +71,9 @@ txn_add_redo(struct txn_stmt *stmt, struct request *request) return 0; } -/** Initialize a new stmt object within txn. */ +/** Allocate a new txn statement. */ static struct txn_stmt * -txn_stmt_new(struct txn *txn) +txn_stmt_new(void) { struct txn_stmt *stmt; stmt = region_alloc_object(&fiber()->gc, struct txn_stmt); @@ -89,12 +89,6 @@ txn_stmt_new(struct txn *txn) stmt->new_tuple = NULL; stmt->engine_savepoint = NULL; stmt->row = NULL; - - /* Set the savepoint for statement rollback. */ - txn->sub_stmt_begin[txn->in_sub_stmt] = stailq_last(&txn->stmts); - txn->in_sub_stmt++; - - stailq_add_tail_entry(&txn->stmts, stmt, next); return stmt; } @@ -106,13 +100,15 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) stailq_cut_tail(&txn->stmts, svp, &rollback); stailq_reverse(&rollback); stailq_foreach_entry(stmt, &rollback, next) { - engine_rollback_statement(txn->engine, txn, stmt); + if (stmt->space != NULL) { + engine_rollback_statement(txn->engine, txn, stmt); + stmt->space = NULL; + } if (stmt->row != NULL) { assert(txn->n_rows > 0); txn->n_rows--; stmt->row = NULL; } - stmt->space = NULL; } } @@ -145,7 +141,6 @@ int txn_begin_in_engine(struct engine *engine, struct txn *txn) { if (txn->engine == NULL) { - assert(stailq_empty(&txn->stmts)); txn->engine = engine; return engine_begin(engine, txn); } else if (txn->engine != engine) { @@ -179,11 +174,17 @@ txn_begin_stmt(struct space *space) if (txn_begin_in_engine(engine, txn) != 0) goto fail; - struct txn_stmt *stmt = txn_stmt_new(txn); + struct txn_stmt *stmt = txn_stmt_new(); if (stmt == NULL) goto fail; stmt->space = space; + /* Set the savepoint for statement rollback. */ + txn->sub_stmt_begin[txn->in_sub_stmt] = stailq_last(&txn->stmts); + txn->in_sub_stmt++; + + stailq_add_tail_entry(&txn->stmts, stmt, next); + if (engine_begin_statement(engine, txn) != 0) { txn_rollback_stmt(); return NULL; @@ -238,6 +239,35 @@ fail: return -1; } +int +txn_commit_nop(struct request *request) +{ + assert(request->header != NULL); + + struct txn *txn = in_txn(); + if (txn == NULL) { + txn = txn_begin(true); + if (txn == NULL) + return -1; + } + + struct txn_stmt *stmt = txn_stmt_new(); + if (stmt == NULL) + goto fail; + if (txn_add_redo(stmt, request) != 0) + goto fail; + + txn->n_rows++; + stailq_add_tail_entry(&txn->stmts, stmt, next); + + if (txn->is_autocommit && txn->in_sub_stmt == 0) + return txn_commit(txn); + return 0; +fail: + if (txn->is_autocommit && txn->in_sub_stmt == 0) + txn_rollback(); + return -1; +} static int64_t txn_write_to_wal(struct txn *txn) @@ -287,32 +317,31 @@ txn_commit(struct txn *txn) { assert(txn == in_txn()); - assert(stailq_empty(&txn->stmts) || txn->engine); - /* Do transaction conflict resolving */ - if (txn->engine) { - if (engine_prepare(txn->engine, txn) != 0) - goto fail; + if (txn->engine != NULL && + engine_prepare(txn->engine, txn) != 0) + goto fail; - if (txn->n_rows > 0) { - txn->signature = txn_write_to_wal(txn); - if (txn->signature < 0) - goto fail; - } - /* - * The transaction is in the binary log. No action below - * may throw. In case an error has happened, there is - * no other option but terminate. - */ - if (txn->has_triggers && - trigger_run(&txn->on_commit, txn) != 0) { - diag_log(); - unreachable(); - panic("commit trigger failed"); - } + if (txn->n_rows > 0) { + txn->signature = txn_write_to_wal(txn); + if (txn->signature < 0) + goto fail; + } + /* + * The transaction is in the binary log. No action below + * may throw. In case an error has happened, there is + * no other option but terminate. + */ + if (txn->has_triggers && + trigger_run(&txn->on_commit, txn) != 0) { + diag_log(); + unreachable(); + panic("commit trigger failed"); + } + if (txn->engine != NULL) engine_commit(txn->engine, txn); - } + TRASH(txn); /** Free volatile txn memory. */ fiber_gc(); @@ -470,7 +499,7 @@ box_txn_rollback_to_savepoint(box_txn_savepoint_t *svp) } struct txn_stmt *stmt = svp->stmt == NULL ? NULL : stailq_entry(svp->stmt, struct txn_stmt, next); - if (stmt != NULL && stmt->space == NULL) { + if (stmt != NULL && stmt->space == NULL && stmt->row == NULL) { /* * The statement at which this savepoint was * created has been rolled back. diff --git a/src/box/txn.h b/src/box/txn.h index f3d690be..b61adc33 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -253,6 +253,18 @@ void txn_rollback_stmt(); /** + * Commit a no-op request. + * + * A no-op request does not affect any space, but it + * promotes vclock and is written to WAL. + * + * This function can be called both as a part of an + * independent transaction and in autocommit mode. + */ +int +txn_commit_nop(struct request *request); + +/** * Raise an error if this is a multi-statement * transaction: DDL can not be part of a multi-statement * transaction and must be run in autocommit mode. diff --git a/test/box/before_replace.result b/test/box/before_replace.result index 4f47b9fa..2b6c1801 100644 --- a/test/box/before_replace.result +++ b/test/box/before_replace.result @@ -660,9 +660,9 @@ row.HEADER.type --- - NOP ... -row.BODY.space_id == s.id +#row.BODY --- -- true +- 0 ... -- gh-3128 before_replace with run_triggers s2 = box.schema.space.create("test2") diff --git a/test/box/before_replace.test.lua b/test/box/before_replace.test.lua index 22733c1d..9b4f49cf 100644 --- a/test/box/before_replace.test.lua +++ b/test/box/before_replace.test.lua @@ -221,7 +221,7 @@ path = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.lsn - fun, param, state = xlog.pairs(path) state, row = fun(param, state) row.HEADER.type -row.BODY.space_id == s.id +#row.BODY -- gh-3128 before_replace with run_triggers s2 = box.schema.space.create("test2") -- 2.11.0