From: Georgy Kirichenko <georgy@tarantool.org> To: tarantool-patches@freelists.org Cc: Georgy Kirichenko <georgy@tarantool.org> Subject: [tarantool-patches] [PATCH 2/2] Get rid of aurocommit from a txn structure Date: Wed, 10 Apr 2019 10:22:26 +0300 [thread overview] Message-ID: <3b4ffe843914867d84b8b136ffb892bc580d8581.1554880565.git.georgy@tarantool.org> (raw) In-Reply-To: <cover.1554880565.git.georgy@tarantool.org> Move transaction auto start and auto commit behavior to box level. Now transaction won't start and commit automatically without txn_begin/txn_commit invocations. This is a part of a bigger transaction refactoring in order to implement detachable transactions and a parallel applier. Prerequisites: #1254 --- src/box/applier.cc | 27 +++++++++++++--- src/box/box.cc | 73 +++++++++++++++++++++++++++++++----------- src/box/memtx_engine.c | 10 ++++-- src/box/txn.c | 25 ++++----------- src/box/txn.h | 7 +--- src/box/vinyl.c | 12 +++---- 6 files changed, 97 insertions(+), 57 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 373e1feb9..3b74e0f54 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -172,11 +172,22 @@ applier_writer_f(va_list ap) static int apply_initial_join_row(struct xrow_header *row) { + struct txn *txn = txn_begin(); + if (txn == NULL) + return -1; struct request request; xrow_decode_dml(row, &request, dml_request_key_map(row->type)); - struct space *space = space_cache_find_xc(request.space_id); + struct space *space = space_cache_find(request.space_id); + if (space == NULL) + return -1; /* no access checks here - applier always works with admin privs */ - return space_apply_initial_join_row(space, &request); + if (space_apply_initial_join_row(space, &request)) { + txn_rollback(); + return -1; + } + int rc = txn_commit(txn); + fiber_gc(); + return rc; } /** @@ -403,8 +414,16 @@ applier_join(struct applier *applier) applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { vclock_follow_xrow(&replicaset.vclock, &row); - if (apply_row(&row) != 0) + struct txn *txn = txn_begin(); + if (txn == NULL) + diag_raise(); + if (apply_row(&row) != 0) { + txn_rollback(); + diag_raise(); + } + if (txn_commit(txn) != 0) diag_raise(); + fiber_gc(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); } else if (row.type == IPROTO_OK) { @@ -555,7 +574,7 @@ applier_apply_tx(struct stailq *rows) * conflict safely access failed xrow object and allocate * IPROTO_NOP on gc. */ - struct txn *txn = txn_begin(false); + struct txn *txn = txn_begin(); struct applier_tx_row *item; if (txn == NULL) diag_raise(); diff --git a/src/box/box.cc b/src/box/box.cc index 7828f575b..6119ca078 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -169,34 +169,53 @@ int box_process_rw(struct request *request, struct space *space, struct tuple **result) { + int rc = -1; + struct tuple *tuple = NULL; + struct txn *txn = in_txn(); + bool autocommit = txn == NULL; + if (txn == NULL && (txn = txn_begin()) == NULL) + return -1; assert(iproto_type_is_dml(request->type)); rmean_collect(rmean_box, request->type, 1); if (access_check_space(space, PRIV_W) != 0) - return -1; - struct txn *txn = txn_begin_stmt(space); - if (txn == NULL) - return -1; - struct tuple *tuple; + goto fail; + if (txn_begin_stmt(space) == NULL) + goto fail; if (space_execute_dml(space, txn, request, &tuple) != 0) { txn_rollback_stmt(); - return -1; + goto fail; } - if (result == NULL) - return txn_commit_stmt(txn, request); - *result = tuple; - if (tuple == NULL) - return txn_commit_stmt(txn, request); /* * Pin the tuple locally before the commit, * otherwise it may go away during yield in * when WAL is written in autocommit mode. */ - tuple_ref(tuple); - int rc = txn_commit_stmt(txn, request); - if (rc == 0) + if (tuple != NULL) + tuple_ref(tuple); + + if (result != NULL) + *result = tuple; + + if (txn_commit_stmt(txn, request) != 0 || + (autocommit && txn_commit(txn) != 0)) + goto fail; + if (autocommit) + fiber_gc(); + + if (tuple != NULL && result != NULL) { tuple_bless(tuple); - tuple_unref(tuple); + } + + rc = 0; +done: + if (tuple != NULL) + tuple_unref(tuple); return rc; + +fail: + if (autocommit) + txn_rollback(); + goto done; } void @@ -299,8 +318,12 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); if (request.type != IPROTO_NOP) { struct space *space = space_cache_find_xc(request.space_id); - if (box_process_rw(&request, space, NULL) != 0) { + struct txn *txn = txn_begin(); + if (txn == NULL || box_process_rw(&request, space, NULL) != 0 || + txn_commit(txn) != 0) { say_error("error applying row: %s", request_str(&request)); + if (txn != NULL) + txn_rollback(); diag_raise(); } } @@ -1313,8 +1336,15 @@ box_sequence_reset(uint32_t seq_id) static inline void box_register_replica(uint32_t id, const struct tt_uuid *uuid) { + struct txn *txn = txn_begin(); + if (txn == NULL) + diag_raise(); if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]", - (unsigned) id, tt_uuid_str(uuid)) != 0) + (unsigned) id, tt_uuid_str(uuid)) != 0) { + txn_rollback(); + diag_raise(); + } + if (txn_commit(txn) != 0) diag_raise(); assert(replica_by_uuid(uuid)->id == id); } @@ -1636,9 +1666,16 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid) uu = *replicaset_uuid; else tt_uuid_create(&uu); + struct txn *txn = txn_begin(); + if (txn == NULL) + diag_raise(); /* Save replica set UUID in _schema */ if (boxk(IPROTO_INSERT, BOX_SCHEMA_ID, "[%s%s]", "cluster", - tt_uuid_str(&uu))) + tt_uuid_str(&uu))) { + txn_rollback(); + diag_raise(); + } + if (txn_commit(txn) != 0) diag_raise(); } diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 924f8bbc4..f052657f7 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -264,16 +264,22 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION); return -1; } + struct txn *txn = txn_begin(); + if (txn == NULL) + return -1; /* no access checks here - applier always works with admin privs */ - if (space_apply_initial_join_row(space, &request) != 0) + if (space_apply_initial_join_row(space, &request) != 0) { + txn_rollback(); return -1; + } + int rc = txn_commit(txn); /* * Don't let gc pool grow too much. Yet to * it before reading the next row, to make * sure it's not freed along here. */ fiber_gc(); - return 0; + return rc; } /** Called at start to tell memtx to recover to a given LSN. */ diff --git a/src/box/txn.c b/src/box/txn.c index be5e209b6..2b44aba48 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -172,7 +172,7 @@ txn_free(struct txn *txn) } struct txn * -txn_begin(bool is_autocommit) +txn_begin() { static int64_t tsn = 0; assert(! in_txn()); @@ -185,7 +185,6 @@ txn_begin(bool is_autocommit) txn->n_new_rows = 0; txn->n_local_rows = 0; txn->n_applier_rows = 0; - txn->is_autocommit = is_autocommit; txn->has_triggers = false; txn->is_aborted = false; txn->in_sub_stmt = 0; @@ -223,19 +222,15 @@ txn_begin_stmt(struct space *space) { struct txn *txn = in_txn(); if (txn == NULL) { - txn = txn_begin(true); - if (txn == NULL) - return NULL; + diag_set(ClientError, ER_NO_TRANSACTION); + return NULL; } else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) { diag_set(ClientError, ER_SUB_STMT_MAX); return NULL; } struct txn_stmt *stmt = txn_stmt_new(txn); - if (stmt == NULL) { - if (txn->is_autocommit && txn->in_sub_stmt == 0) - txn_rollback(); + if (stmt == NULL) return NULL; - } if (space == NULL) return txn; @@ -270,8 +265,7 @@ txn_is_distributed(struct txn *txn) } /** - * End a statement. In autocommit mode, end - * the current transaction as well. + * End a statement. */ int txn_commit_stmt(struct txn *txn, struct request *request) @@ -314,11 +308,6 @@ txn_commit_stmt(struct txn *txn, struct request *request) goto fail; } --txn->in_sub_stmt; - if (txn->is_autocommit && txn->in_sub_stmt == 0) { - int rc = txn_commit(txn); - fiber_gc(); - return rc; - } return 0; fail: txn_rollback_stmt(); @@ -444,8 +433,6 @@ txn_rollback_stmt() if (txn == NULL || txn->in_sub_stmt == 0) return; txn->in_sub_stmt--; - if (txn->is_autocommit && txn->in_sub_stmt == 0) - return txn_rollback(); txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]); } @@ -516,7 +503,7 @@ box_txn_begin() diag_set(ClientError, ER_ACTIVE_TRANSACTION); return -1; } - if (txn_begin(false) == NULL) + if (txn_begin() == NULL) return -1; return 0; } diff --git a/src/box/txn.h b/src/box/txn.h index 84c82af92..a304e11d9 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -154,11 +154,6 @@ struct txn { * already assigned LSN. */ int n_applier_rows; - /** - * True if this transaction is running in autocommit mode - * (statement end causes an automatic transaction commit). - */ - bool is_autocommit; /** * True if the transaction was aborted so should be * rolled back at commit. @@ -206,7 +201,7 @@ in_txn() * @pre no transaction is active */ struct txn * -txn_begin(bool is_autocommit); +txn_begin(); /** * Commit a transaction. diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 1e9937613..8a90cd1c9 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -2399,10 +2399,8 @@ vinyl_engine_begin(struct engine *engine, struct txn *txn) txn->engine_tx = vy_tx_begin(env->xm); if (txn->engine_tx == NULL) return -1; - if (!txn->is_autocommit) { - trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL); - trigger_add(&fiber()->on_stop, &txn->fiber_on_stop); - } + trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL); + trigger_add(&fiber()->on_stop, &txn->fiber_on_stop); return 0; } @@ -2472,8 +2470,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn) vy_regulator_check_dump_watermark(&env->regulator); txn->engine_tx = NULL; - if (!txn->is_autocommit) - trigger_clear(&txn->fiber_on_stop); + trigger_clear(&txn->fiber_on_stop); } static void @@ -2487,8 +2484,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn) vy_tx_rollback(tx); txn->engine_tx = NULL; - if (!txn->is_autocommit) - trigger_clear(&txn->fiber_on_stop); + trigger_clear(&txn->fiber_on_stop); } static int -- 2.21.0
next prev parent reply other threads:[~2019-04-10 7:22 UTC|newest] Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-04-10 7:22 [tarantool-patches] [PATCH 0/2] Transaction refactoring Georgy Kirichenko 2019-04-10 7:22 ` [tarantool-patches] [PATCH 1/2] Introduce a txn memory region Georgy Kirichenko 2019-04-10 10:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-10 7:22 ` Georgy Kirichenko [this message] 2019-04-11 6:43 ` [tarantool-patches] Re: [PATCH 2/2] Get rid of aurocommit from a txn structure Konstantin Osipov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=3b4ffe843914867d84b8b136ffb892bc580d8581.1554880565.git.georgy@tarantool.org \ --to=georgy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH 2/2] Get rid of aurocommit from a txn structure' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox