From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 5322D2CCB4 for ; Fri, 19 Apr 2019 08:44:14 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id DLsUBvOG2Zj9 for ; Fri, 19 Apr 2019 08:44:14 -0400 (EDT) Received: from smtp50.i.mail.ru (smtp50.i.mail.ru [94.100.177.110]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id CB5362CCAC for ; Fri, 19 Apr 2019 08:44:13 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt Date: Fri, 19 Apr 2019 15:44:02 +0300 Message-Id: <4ff3a4db9b50f532e94b2e0dd0ae8c02100ebf7d.1555677159.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Pass a txn structure to tnx_begin_stmt and txn_rollback_stmt functions. Prerequisites: #1254 --- src/box/applier.cc | 4 ++-- src/box/box.cc | 8 ++++---- src/box/index.cc | 10 +++++----- src/box/journal.c | 1 + src/box/journal.h | 16 +++++++++++++++ src/box/memtx_space.c | 8 ++++---- src/box/sql.c | 2 +- src/box/txn.c | 45 ++++++++++++++++++++++++------------------ src/box/txn.h | 9 ++++----- src/box/vy_scheduler.c | 6 +++--- src/box/wal.c | 5 ++++- 11 files changed, 70 insertions(+), 44 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 42b6efc4d..43b1a84bc 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -201,8 +201,8 @@ static int process_nop(struct request *request) { assert(request->type == IPROTO_NOP); - struct txn *txn = txn_begin_stmt(NULL); - if (txn == NULL) + struct txn *txn = in_txn(); + if (txn_begin_stmt(txn, NULL) == NULL) return -1; return txn_commit_stmt(txn, request); } diff --git a/src/box/box.cc b/src/box/box.cc index 835a00c95..46cd444fd 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -178,10 +178,10 @@ box_process_rw(struct request *request, struct space *space, rmean_collect(rmean_box, request->type, 1); if (access_check_space(space, PRIV_W) != 0) goto fail; - if (txn_begin_stmt(space) == NULL) + if (txn_begin_stmt(txn, space) == NULL) goto fail; if (space_execute_dml(space, txn, request, &tuple) != 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); goto fail; } /* @@ -1087,7 +1087,7 @@ box_select(uint32_t space_id, uint32_t index_id, struct iterator *it = index_create_iterator(index, type, key, part_count); if (it == NULL) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } @@ -1112,7 +1112,7 @@ box_select(uint32_t space_id, uint32_t index_id, if (rc != 0) { port_destroy(port); - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); diff --git a/src/box/index.cc b/src/box/index.cc index 2817d076d..baee0c1cb 100644 --- a/src/box/index.cc +++ b/src/box/index.cc @@ -239,7 +239,7 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key, if (txn_begin_ro_stmt(space, &txn) != 0) return -1; if (index_get(index, key, part_count, result) != 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); @@ -273,7 +273,7 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key, if (txn_begin_ro_stmt(space, &txn) != 0) return -1; if (index_min(index, key, part_count, result) != 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); @@ -305,7 +305,7 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key, if (txn_begin_ro_stmt(space, &txn) != 0) return -1; if (index_max(index, key, part_count, result) != 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); @@ -339,7 +339,7 @@ box_index_count(uint32_t space_id, uint32_t index_id, int type, return -1; ssize_t count = index_count(index, itype, key, part_count); if (count < 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } txn_commit_ro_stmt(txn); @@ -376,7 +376,7 @@ box_index_iterator(uint32_t space_id, uint32_t index_id, int type, struct iterator *it = index_create_iterator(index, itype, key, part_count); if (it == NULL) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return NULL; } txn_commit_ro_stmt(txn); diff --git a/src/box/journal.c b/src/box/journal.c index fe13fb6ee..5cffc7452 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -70,6 +70,7 @@ journal_entry_new(size_t n_rows, struct region *region) entry->n_rows = n_rows; entry->res = -1; entry->fiber = fiber(); + rlist_create(&entry->on_error); return entry; } diff --git a/src/box/journal.h b/src/box/journal.h index 8ac32ee5e..94be0b007 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -32,6 +32,7 @@ */ #include #include +#include "trigger.h" #include "salad/stailq.h" #if defined(__cplusplus) @@ -58,6 +59,12 @@ struct journal_entry { * The fiber issuing the request. */ struct fiber *fiber; + /** + * A trigger list to call if write failed. Triggers are going to be + * fired before any other processing and are a good place to implement + * any rollback non-yielding behavior. + */ + struct rlist on_error; /** * Approximate size of this request when encoded. */ @@ -82,6 +89,15 @@ struct region; struct journal_entry * journal_entry_new(size_t n_rows, struct region *region); +/** + * Add an on_error trigger to a journal entry. + */ +static inline void +journal_entry_on_error(struct journal_entry *entry, struct trigger *trigger) +{ + trigger_add(&entry->on_error, trigger); +} + /** * An API for an abstract journal for all transactions of this * instance, as well as for multiple instances in case of diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c index a28204d30..f2b28fb71 100644 --- a/src/box/memtx_space.c +++ b/src/box/memtx_space.c @@ -310,10 +310,10 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request) return -1; } request->header->replica_id = 0; - struct txn *txn = txn_begin_stmt(space); - if (txn == NULL) + struct txn *txn = in_txn(); + struct txn_stmt *stmt = txn_begin_stmt(txn, space); + if (stmt == NULL) return -1; - struct txn_stmt *stmt = txn_current_stmt(txn); stmt->new_tuple = memtx_tuple_new(space->format, request->tuple, request->tuple_end); if (stmt->new_tuple == NULL) @@ -326,7 +326,7 @@ memtx_space_apply_initial_join_row(struct space *space, struct request *request) rollback: say_error("rollback: %s", diag_last_error(diag_get())->errmsg); - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } diff --git a/src/box/sql.c b/src/box/sql.c index 1fb93e106..0051b93c0 100644 --- a/src/box/sql.c +++ b/src/box/sql.c @@ -911,7 +911,7 @@ cursor_seek(BtCursor *pCur, int *pRes) part_count); if (it == NULL) { if (txn != NULL) - txn_rollback_stmt(); + txn_rollback_stmt(txn); pCur->eState = CURSOR_INVALID; return SQL_TARANTOOL_ITERATOR_FAIL; } diff --git a/src/box/txn.c b/src/box/txn.c index fdfffa144..8f5b66480 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -218,14 +218,10 @@ txn_begin_in_engine(struct engine *engine, struct txn *txn) return 0; } -struct txn * -txn_begin_stmt(struct space *space) +struct txn_stmt * +txn_begin_stmt(struct txn *txn, struct space *space) { - struct txn *txn = in_txn(); - if (txn == NULL) { - diag_set(ClientError, ER_NO_TRANSACTION); - return NULL; - } else if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) { + if (txn->in_sub_stmt > TXN_SUB_STMT_MAX) { diag_set(ClientError, ER_SUB_STMT_MAX); return NULL; } @@ -233,7 +229,7 @@ txn_begin_stmt(struct space *space) if (stmt == NULL) return NULL; if (space == NULL) - return txn; + return stmt; if (trigger_run(&space->on_stmt_begin, txn) != 0) goto fail; @@ -246,9 +242,9 @@ txn_begin_stmt(struct space *space) if (engine_begin_statement(engine, txn) != 0) goto fail; - return txn; + return stmt; fail: - txn_rollback_stmt(); + txn_rollback_stmt(txn); return NULL; } @@ -311,10 +307,23 @@ txn_commit_stmt(struct txn *txn, struct request *request) --txn->in_sub_stmt; return 0; fail: - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } +/* + * Callback called if journal write failed. + */ +static void +journal_write_error_cb(struct trigger *trigger, void *event) +{ + (void) event; + struct txn *txn = (struct txn *)trigger->data; + if (txn->engine) + engine_rollback(txn->engine, txn); + txn->engine = NULL; +} + static int64_t txn_write_to_wal(struct txn *txn) { @@ -326,6 +335,10 @@ txn_write_to_wal(struct txn *txn) if (req == NULL) return -1; + struct trigger on_error; + trigger_create(&on_error, journal_write_error_cb, txn, NULL); + journal_entry_on_error(req, &on_error); + struct txn_stmt *stmt; struct xrow_header **remote_row = req->rows; struct xrow_header **local_row = req->rows + txn->n_applier_rows; @@ -348,12 +361,6 @@ txn_write_to_wal(struct txn *txn) if (res < 0) { /* Cascading rollback. */ txn_rollback(txn); /* Perform our part of cascading rollback. */ - /* - * Move fiber to end of event loop to avoid - * execution of any new requests before all - * pending rollbacks are processed. - */ - fiber_reschedule(); diag_set(ClientError, ER_WAL_IO); diag_log(); } else if (stop - start > too_long_threshold) { @@ -429,9 +436,8 @@ fail: } void -txn_rollback_stmt() +txn_rollback_stmt(struct txn *txn) { - struct txn *txn = in_txn(); if (txn == NULL || txn->in_sub_stmt == 0) return; txn->in_sub_stmt--; @@ -449,6 +455,7 @@ txn_rollback(struct txn *txn) unreachable(); panic("rollback trigger failed"); } + if (txn->engine) engine_rollback(txn->engine, txn); diff --git a/src/box/txn.h b/src/box/txn.h index ae2d7b9a5..96719638b 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -261,11 +261,10 @@ txn_on_rollback(struct txn *txn, struct trigger *trigger) } /** - * Start a new statement. If no current transaction, - * start a new transaction with autocommit = true. + * Start a new statement. */ -struct txn * -txn_begin_stmt(struct space *space); +struct txn_stmt * +txn_begin_stmt(struct txn *txn, struct space *space); int txn_begin_in_engine(struct engine *engine, struct txn *txn); @@ -324,7 +323,7 @@ txn_commit_stmt(struct txn *txn, struct request *request); * rolls back the entire transaction. */ void -txn_rollback_stmt(); +txn_rollback_stmt(struct txn *txn); /** * Raise an error if this is a multi-statement transaction: DDL diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 1f6b30f4a..053558c8c 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -840,14 +840,14 @@ vy_deferred_delete_process_one(struct space *deferred_delete_space, tuple_unref(delete); - struct txn *txn = txn_begin_stmt(deferred_delete_space); - if (txn == NULL) + struct txn *txn = in_txn(); + if (txn_begin_stmt(txn, deferred_delete_space) == NULL) return -1; struct tuple *unused; if (space_execute_dml(deferred_delete_space, txn, &request, &unused) != 0) { - txn_rollback_stmt(); + txn_rollback_stmt(txn); return -1; } return txn_commit_stmt(txn, &request); diff --git a/src/box/wal.c b/src/box/wal.c index ad8ff7c62..8dfa3ef27 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -300,7 +300,10 @@ tx_schedule_rollback(struct cmsg *msg) * in-memory database state. */ stailq_reverse(&writer->rollback); - /* Must not yield. */ + /* Call error callback for each request before any scheduling. */ + struct journal_entry *req; + stailq_foreach_entry(req, &writer->rollback, fifo) + trigger_run(&req->on_error, NULL); tx_schedule_queue(&writer->rollback); stailq_create(&writer->rollback); } -- 2.21.0