From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id A719831645 for ; Fri, 21 Jun 2019 17:48:26 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id UbKWjWAdFT0B for ; Fri, 21 Jun 2019 17:48:26 -0400 (EDT) Received: from smtp51.i.mail.ru (smtp51.i.mail.ru [94.100.177.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id E44EB315DB for ; Fri, 21 Jun 2019 17:48:25 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v5 2/7] txn: get rid of autocommit from a txn structure Date: Sat, 22 Jun 2019 00:48:16 +0300 Message-Id: <2c43b7a73a703ed8e1b6a75c28905d4c49ea50e4.1561153472.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Move transaction auto start and auto commit behavior to the box level. >From now a transaction won't start and commit automatically without txn_begin/txn_commit invocations. This is a part of a bigger transaction refactoring in order to implement detachable transactions and a parallel applier. Prerequisites: #1254 --- src/box/applier.cc | 43 ++++++++++++++++++++++---- src/box/box.cc | 70 +++++++++++++++++++++++++++--------------- src/box/index.cc | 10 +++--- src/box/memtx_engine.c | 10 ++++-- src/box/memtx_space.c | 6 ++-- src/box/sql.c | 2 +- src/box/txn.c | 52 +++++++++++-------------------- src/box/txn.h | 16 +++------- src/box/vy_scheduler.c | 6 ++-- 9 files changed, 126 insertions(+), 89 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 373e1feb9..d12a835d0 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -172,11 +172,26 @@ applier_writer_f(va_list ap) static int apply_initial_join_row(struct xrow_header *row) { + struct txn *txn = txn_begin(); + if (txn == NULL) + return -1; struct request request; xrow_decode_dml(row, &request, dml_request_key_map(row->type)); - struct space *space = space_cache_find_xc(request.space_id); + struct space *space = space_cache_find(request.space_id); + if (space == NULL) + goto rollback; /* no access checks here - applier always works with admin privs */ - return space_apply_initial_join_row(space, &request); + if (space_apply_initial_join_row(space, &request)) + goto rollback; + int rc; + rc = txn_commit(txn); + if (rc < 0) + return -1; + fiber_gc(); + return rc; +rollback: + txn_rollback(); + return -1; } /** @@ -189,8 +204,8 @@ static int process_nop(struct request *request) { assert(request->type == IPROTO_NOP); - struct txn *txn = txn_begin_stmt(NULL); - if (txn == NULL) + struct txn *txn = in_txn(); + if (txn_begin_stmt(txn, NULL) != 0) return -1; return txn_commit_stmt(txn, request); } @@ -213,6 +228,22 @@ apply_row(struct xrow_header *row) return 0; } +static int +apply_final_join_row(struct xrow_header *row) +{ + struct txn *txn = txn_begin(); + if (txn == NULL) + return -1; + if (apply_row(row) != 0) { + txn_rollback(); + return -1; + } + if (txn_commit(txn) != 0) + return -1; + fiber_gc(); + return 0; +} + /** * Connect to a remote host and authenticate the client. */ @@ -403,7 +434,7 @@ applier_join(struct applier *applier) applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { vclock_follow_xrow(&replicaset.vclock, &row); - if (apply_row(&row) != 0) + if (apply_final_join_row(&row) != 0) diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); @@ -555,7 +586,7 @@ applier_apply_tx(struct stailq *rows) * conflict safely access failed xrow object and allocate * IPROTO_NOP on gc. */ - struct txn *txn = txn_begin(false); + struct txn *txn = txn_begin(); struct applier_tx_row *item; if (txn == NULL) diag_raise(); diff --git a/src/box/box.cc b/src/box/box.cc index 57419ee01..a32f1ba0f 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -169,34 +169,56 @@ int box_process_rw(struct request *request, struct space *space, struct tuple **result) { + struct tuple *tuple = NULL; + bool return_tuple = false; + struct txn *txn = in_txn(); + bool is_autocommit = txn == NULL; + if (is_autocommit && (txn = txn_begin()) == NULL) + return -1; assert(iproto_type_is_dml(request->type)); rmean_collect(rmean_box, request->type, 1); if (access_check_space(space, PRIV_W) != 0) - return -1; - struct txn *txn = txn_begin_stmt(space); - if (txn == NULL) - return -1; - struct tuple *tuple; + goto rollback; + if (txn_begin_stmt(txn, space) != 0) + goto rollback; if (space_execute_dml(space, txn, request, &tuple) != 0) { - txn_rollback_stmt(); - return -1; + txn_rollback_stmt(txn); + goto rollback; } - if (result == NULL) - return txn_commit_stmt(txn, request); - *result = tuple; - if (tuple == NULL) - return txn_commit_stmt(txn, request); - /* - * Pin the tuple locally before the commit, - * otherwise it may go away during yield in - * when WAL is written in autocommit mode. - */ - tuple_ref(tuple); - int rc = txn_commit_stmt(txn, request); - if (rc == 0) + if (result != NULL) + *result = tuple; + + return_tuple = result != NULL && tuple != NULL; + if (return_tuple) { + /* + * Pin the tuple locally before the commit, + * otherwise it may go away during yield in + * when WAL is written in autocommit mode. + */ + tuple_ref(tuple); + } + + if (txn_commit_stmt(txn, request)) + goto rollback; + + if (is_autocommit) { + if (txn_commit(txn) != 0) + goto error; + fiber_gc(); + } + if (return_tuple) { tuple_bless(tuple); - tuple_unref(tuple); - return rc; + tuple_unref(tuple); + } + return 0; + +rollback: + if (is_autocommit) + txn_rollback(); +error: + if (return_tuple) + tuple_unref(tuple); + return -1; } void @@ -1055,7 +1077,7 @@ box_select(uint32_t space_id, uint32_t index_id, struct iterator *it = index_create_iterator(index, type, key, part_count); if (it == NULL) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } @@ -1080,7 +1102,7 @@ box_select(uint32_t space_id, uint32_t index_id, if (rc != 0) { port_destroy(port); - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); diff --git a/src/box/index.cc b/src/box/index.cc index 4a444e5d0..7f26c9bc2 100644 --- a/src/box/index.cc +++ b/src/box/index.cc @@ -240,7 +240,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key, if (txn_begin_ro_stmt(space, &txn) != 0) return -1; if (index_get(index, key, part_count, result) != 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); @@ -274,7 +274,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key, if (txn_begin_ro_stmt(space, &txn) != 0) return -1; if (index_min(index, key, part_count, result) != 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); @@ -306,7 +306,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key, if (txn_begin_ro_stmt(space, &txn) != 0) return -1; if (index_max(index, key, part_count, result) != 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); @@ -340,7 +340,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type, return -1; ssize_t count = index_count(index, itype, key, part_count); if (count < 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); @@ -377,7 +377,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type, struct iterator *it = index_create_iterator(index, itype, key, part_count); if (it == NULL) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return NULL; } txn_commit_ro_stmt(txn); diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index cd763e547..dae9955b2 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -269,16 +269,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION); return -1; } + struct txn *txn = txn_begin(); + if (txn == NULL) + return -1; /* no access checks here - applier always works with admin privs */ - if (space_apply_initial_join_row(space, &request) != 0) + if (space_apply_initial_join_row(space, &request) != 0) { + txn_rollback(); return -1; + } + int rc = txn_commit(txn); /* * Don't let gc pool grow too much. Yet to * it before reading the next row, to make * sure it's not freed along here. */ fiber_gc(); - return 0; + return rc; } /** Called at start to tell memtx to recover to a given LSN. */ diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c index 78a0059a0..77a8f7d5f 100644 --- a/src/box/memtx_space.c +++ b/src/box/memtx_space.c @@ -325,8 +325,8 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request) return -1; } request->header->replica_id = 0; - struct txn *txn = txn_begin_stmt(space); - if (txn == NULL) + struct txn *txn = in_txn(); + if (txn_begin_stmt(txn, space) != 0) return -1; struct txn_stmt *stmt = txn_current_stmt(txn); stmt->new_tuple = memtx_tuple_new(space->format, request->tuple, @@ -341,7 +341,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request) rollback: say_error("rollback: %s", diag_last_error(diag_get())->errmsg); - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } diff --git a/src/box/sql.c b/src/box/sql.c index a0350da6b..4c9a4c15b 100644 --- a/src/box/sql.c +++ b/src/box/sql.c @@ -862,7 +862,7 @@ cursor_seek(BtCursor *pCur, int *pRes) part_count); if (it == NULL) { if (txn != NULL) - txn_rollback_stmt(); + txn_rollback_stmt(txn); pCur->eState = CURSOR_INVALID; return -1; } diff --git a/src/box/txn.c b/src/box/txn.c index 9aa460f50..583ae6ce0 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -178,7 +178,7 @@ txn_free(struct txn *txn) } struct txn * -txn_begin(bool is_autocommit) +txn_begin() { static int64_t tsn = 0; assert(! in_txn()); @@ -191,7 +191,6 @@ txn_begin(bool is_autocommit) txn->n_new_rows = 0; txn->n_local_rows = 0; txn->n_applier_rows = 0; - txn->is_autocommit = is_autocommit; txn->has_triggers = false; txn->is_aborted = false; txn->in_sub_stmt = 0; @@ -226,26 +225,20 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn) return 0; } -struct txn * -txn_begin_stmt(struct space *space) +int +txn_begin_stmt(struct txn *txn, struct space *space) { - struct txn *txn = in_txn(); - if (txn == NULL) { - txn = txn_begin(true); - if (txn == NULL) - return NULL; - } else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) { + assert(txn == in_txn()); + assert(txn != NULL); + if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) { diag_set(ClientError, ER_SUB_STMT_MAX); - return NULL; + return -1; } struct txn_stmt *stmt = txn_stmt_new(txn); - if (stmt == NULL) { - if (txn->is_autocommit && txn->in_sub_stmt == 0) - txn_rollback(); - return NULL; - } + if (stmt == NULL) + return -1; if (space == NULL) - return txn; + return 0; if (trigger_run(&space->on_stmt_begin, txn) != 0) goto fail; @@ -258,10 +251,10 @@ txn_begin_stmt(struct space *space) if (engine_begin_statement(engine, txn) != 0) goto fail; - return txn; + return 0; fail: - txn_rollback_stmt(); - return NULL; + txn_rollback_stmt(txn); + return -1; } bool @@ -278,8 +271,7 @@ txn_is_distributed(struct txn *txn) } /** - * End a statement. In autocommit mode, end - * the current transaction as well. + * End a statement. */ int txn_commit_stmt(struct txn *txn, struct request *request) @@ -339,14 +331,9 @@ txn_commit_stmt(struct txn *txn, struct request *request) goto fail; } --txn->in_sub_stmt; - if (txn->is_autocommit && txn->in_sub_stmt == 0) { - int rc = txn_commit(txn); - fiber_gc(); - return rc; - } return 0; fail: - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } @@ -382,7 +369,7 @@ txn_write_to_wal(struct txn *txn) if (res < 0) { /* Cascading rollback. */ - txn_rollback(); /* Perform our part of cascading rollback. */ + txn_rollback(txn); /* Perform our part of cascading rollback. */ /* * Move fiber to end of event loop to avoid * execution of any new requests before all @@ -461,14 +448,11 @@ fail: } void -txn_rollback_stmt() +txn_rollback_stmt(struct txn *txn) { - struct txn *txn = in_txn(); if (txn == NULL || txn->in_sub_stmt == 0) return; txn->in_sub_stmt--; - if (txn->is_autocommit && txn->in_sub_stmt == 0) - return txn_rollback(); txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]); } @@ -536,7 +520,7 @@ box_txn_begin() diag_set(ClientError, ER_ACTIVE_TRANSACTION); return -1; } - if (txn_begin(false) == NULL) + if (txn_begin() == NULL) return -1; return 0; } diff --git a/src/box/txn.h b/src/box/txn.h index f4d861824..33926f6f3 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -162,11 +162,6 @@ struct txn { * already assigned LSN. */ int n_applier_rows; - /** - * True if this transaction is running in autocommit mode - * (statement end causes an automatic transaction commit). - */ - bool is_autocommit; /** * True if the transaction was aborted so should be * rolled back at commit. @@ -214,7 +209,7 @@ in_txn() * @pre no transaction is active */ struct txn * -txn_begin(bool is_autocommit); +txn_begin(); /** * Commit a transaction. @@ -271,11 +266,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger) } /** - * Start a new statement. If no current transaction, - * start a new transaction with autocommit = true. + * Start a new statement. */ -struct txn * -txn_begin_stmt(struct space *space); +int +txn_begin_stmt(struct txn *txn, struct space *space); int txn_begin_in_engine(struct engine *engine, struct txn *txn); @@ -334,7 +328,7 @@ txn_commit_stmt(struct txn *txn, struct request *request); * rolls back the entire transaction. */ void -txn_rollback_stmt(); +txn_rollback_stmt(struct txn *txn); /** * Raise an error if this is a multi-statement transaction: DDL diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 85c1659b0..ed8f7dd86 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space, tuple_unref(delete); - struct txn *txn = txn_begin_stmt(deferred_delete_space); - if (txn == NULL) + struct txn *txn = in_txn(); + if (txn_begin_stmt(txn, deferred_delete_space) != 0) return -1; struct tuple *unused; if (space_execute_dml(deferred_delete_space, txn, &request, &unused) != 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } return txn_commit_stmt(txn, &request); -- 2.22.0