From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 2EF5C2E342 for ; Sun, 9 Jun 2019 16:44:58 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id SWvC_MCgMgEF for ; Sun, 9 Jun 2019 16:44:58 -0400 (EDT) Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id B5CE02E1C0 for ; Sun, 9 Jun 2019 16:44:57 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit Date: Sun, 9 Jun 2019 23:44:40 +0300 Message-Id: <697f3313f6ad706a71c74ec259c0ea37d2702184.1560112747.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Allow asynchronous transaction commit. This adds two functions: * txn_write that sends a transaction to a journal * txn_wait that waits until the transaction processing was done Prerequisites: #1254 --- src/box/box.cc | 24 ++--- src/box/journal.c | 22 +++-- src/box/journal.h | 40 +++++---- src/box/txn.c | 220 +++++++++++++++++++++++++++++++--------------- src/box/txn.h | 12 +++ src/box/vy_log.c | 2 +- src/box/wal.c | 8 ++ 7 files changed, 221 insertions(+), 107 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index d0616095b..510f3fc99 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -308,19 +308,21 @@ struct recovery_journal { * min/max LSN of created LSM levels. */ static int64_t -recovery_journal_async_write(struct journal *base, - struct journal_entry *entry) +recovery_journal_write(struct journal *base, + struct journal_entry *entry) { struct recovery_journal *journal = (struct recovery_journal *) base; entry->res = vclock_sum(journal->vclock); + if (entry->on_done_cb) + entry->on_done_cb(entry, entry->on_done_cb_data); entry->done = true; fiber_cond_broadcast(&entry->done_cond); return 0; } static int64_t -recovery_journal_async_wait(struct journal *base, - struct journal_entry *entry) +recovery_journal_wait(struct journal *base, + struct journal_entry *entry) { (void) base; assert(entry->done); @@ -328,20 +330,20 @@ recovery_journal_async_wait(struct journal *base, } static int64_t -recovery_journal_write(struct journal *base, - struct journal_entry *entry) +recovery_journal_write_sync(struct journal *base, + struct journal_entry *entry) { - if (recovery_journal_async_write(base, entry) == 0) - return recovery_journal_async_wait(base, entry); + if (recovery_journal_write(base, entry) == 0) + return recovery_journal_wait(base, entry); return -1; } static inline void recovery_journal_create(struct recovery_journal *journal, struct vclock *v) { - journal_create(&journal->base, recovery_journal_write, - recovery_journal_async_write, - recovery_journal_async_wait, + journal_create(&journal->base, recovery_journal_write_sync, + recovery_journal_write, + recovery_journal_wait, NULL); journal->vclock = v; } diff --git a/src/box/journal.c b/src/box/journal.c index b978e6752..dadff771e 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -37,17 +37,19 @@ * but txn_commit() must work. */ static int64_t -dummy_async_write(struct journal *journal, struct journal_entry *entry) +dummy_journal_write(struct journal *journal, struct journal_entry *entry) { (void) journal; entry->res = 0; + if (entry->on_done_cb) + entry->on_done_cb(entry, entry->on_done_cb_data); entry->done = true; fiber_cond_broadcast(&entry->done_cond); return 0; } static int64_t -dummy_async_wait(struct journal *journal, struct journal_entry *entry) +dummy_journal_wait(struct journal *journal, struct journal_entry *entry) { (void) journal; assert(entry->done); @@ -55,24 +57,26 @@ dummy_async_wait(struct journal *journal, struct journal_entry *entry) } static int64_t -dummy_journal_write(struct journal *journal, struct journal_entry *entry) +dummy_journal_write_sync(struct journal *journal, struct journal_entry *entry) { - if (dummy_async_write(journal, entry) == 0) - return dummy_async_wait(journal, entry); + if (dummy_journal_write(journal, entry) == 0) + return dummy_journal_wait(journal, entry); return -1; } static struct journal dummy_journal = { + dummy_journal_write_sync, dummy_journal_write, - dummy_async_write, - dummy_async_wait, + dummy_journal_wait, NULL, }; struct journal *current_journal = &dummy_journal; struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region) +journal_entry_new(size_t n_rows, struct region *region, + void (*on_done_cb)(struct journal_entry *entry, void *data), + void *on_done_cb_data) { struct journal_entry *entry; @@ -90,6 +94,8 @@ journal_entry_new(size_t n_rows, struct region *region) entry->res = -1; entry->done = false; fiber_cond_create(&entry->done_cond); + entry->on_done_cb = on_done_cb; + entry->on_done_cb_data = on_done_cb_data; return entry; } diff --git a/src/box/journal.h b/src/box/journal.h index e7fe9154a..c7e467969 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -63,6 +63,14 @@ struct journal_entry { * Condition to broadcast when processing is done. */ struct fiber_cond done_cond; + /** + * A journal entry completion callback. + */ + void (*on_done_cb)(struct journal_entry *entry, void *data); + /** + * A journal entry completion callback argument. + */ + void *on_done_cb_data; /** * Approximate size of this request when encoded. */ @@ -85,7 +93,9 @@ struct region; * @return NULL if out of memory, fiber diagnostics area is set */ struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region); +journal_entry_new(size_t n_rows, struct region *region, + void (*on_done_cb)(struct journal_entry *entry, void *data), + void *on_done_cb_data); /** * An API for an abstract journal for all transactions of this @@ -93,12 +103,12 @@ journal_entry_new(size_t n_rows, struct region *region); * synchronous replication. */ struct journal { + int64_t (*write_sync)(struct journal *journal, + struct journal_entry *req); int64_t (*write)(struct journal *journal, struct journal_entry *req); - int64_t (*async_write)(struct journal *journal, - struct journal_entry *req); - int64_t (*async_wait)(struct journal *journal, - struct journal_entry *req); + int64_t (*wait)(struct journal *journal, + struct journal_entry *req); void (*destroy)(struct journal *journal); }; @@ -115,9 +125,9 @@ extern struct journal *current_journal; * or -1 on error. */ static inline int64_t -journal_write(struct journal_entry *entry) +journal_write_sync(struct journal_entry *entry) { - return current_journal->write(current_journal, entry); + return current_journal->write_sync(current_journal, entry); } /** @@ -126,9 +136,9 @@ journal_write(struct journal_entry *entry) * @return 0 if write was scheduled or -1 on error. */ static inline int64_t -journal_async_write(struct journal_entry *entry) +journal_write(struct journal_entry *entry) { - return current_journal->async_write(current_journal, entry); + return current_journal->write(current_journal, entry); } /** @@ -137,9 +147,9 @@ journal_async_write(struct journal_entry *entry) * or -1 on error. */ static inline int64_t -journal_async_wait(struct journal_entry *entry) +journal_wait(struct journal_entry *entry) { - return current_journal->async_wait(current_journal, entry); + return current_journal->wait(current_journal, entry); } /** @@ -173,14 +183,14 @@ journal_set(struct journal *new_journal) static inline void journal_create(struct journal *journal, + int64_t (*write_sync)(struct journal *, struct journal_entry *), int64_t (*write)(struct journal *, struct journal_entry *), - int64_t (*async_write)(struct journal *, struct journal_entry *), - int64_t (*async_wait)(struct journal *, struct journal_entry *), + int64_t (*wait)(struct journal *, struct journal_entry *), void (*destroy)(struct journal *)) { + journal->write_sync = write_sync; journal->write = write; - journal->async_write = async_write, - journal->async_wait = async_wait, + journal->wait = wait; journal->destroy = destroy; } diff --git a/src/box/txn.c b/src/box/txn.c index a08652af1..815d635fe 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -199,6 +199,7 @@ txn_begin() txn->engine = NULL; txn->engine_tx = NULL; txn->psql_txn = NULL; + txn->entry = NULL; /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ fiber_set_txn(fiber(), txn); return txn; @@ -318,22 +319,77 @@ fail: return -1; } +/** + * Complete transaction processing. + */ +static void +txn_complete(struct txn *txn) +{ + if (txn->signature < 0) { + if (txn->engine) + engine_rollback(txn->engine, txn); + /* Rollback triggers must not throw. */ + fiber_set_txn(fiber(), txn); + if (txn->has_triggers && + trigger_run(&txn->on_rollback, txn) != 0) { + diag_log(); + unreachable(); + panic("rollback trigger failed"); + } + fiber_set_txn(fiber(), NULL); + + return; + } + /* + * Engine can be NULL if transaction contains IPROTO_NOP + * statements only. + */ + if (txn->engine != NULL) + engine_commit(txn->engine, txn); + /* + * The transaction is in the binary log. No action below + * may throw. In case an error has happened, there is + * no other option but terminate. + */ + fiber_set_txn(fiber(), txn); + if (txn->has_triggers && + trigger_run(&txn->on_commit, txn) != 0) { + diag_log(); + unreachable(); + panic("commit trigger failed"); + } + + fiber_set_txn(fiber(), NULL); +} + +static void +txn_entry_done_cb(struct journal_entry *entry, void *data) +{ + struct txn *txn = (struct txn *)data; + assert(txn->entry == entry); + txn->signature = entry->res; + txn_complete(txn); +} + static int64_t -txn_write_to_wal(struct txn *txn) +txn_journal_write(struct txn *txn) { + assert(txn->entry == NULL); assert(txn->n_new_rows + txn->n_applier_rows > 0); - struct journal_entry *req = journal_entry_new(txn->n_new_rows + - txn->n_applier_rows, - &txn->region); - if (req == NULL) { + /* Prepare a journal entry. */ + txn->entry = journal_entry_new(txn->n_new_rows + + txn->n_applier_rows, + &txn->region, + txn_entry_done_cb, txn); + if (txn->entry == NULL) { txn_rollback(txn); return -1; } struct txn_stmt *stmt; - struct xrow_header **remote_row = req->rows; - struct xrow_header **local_row = req->rows + txn->n_applier_rows; + struct xrow_header **remote_row = txn->entry->rows; + struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) continue; /* A read (e.g. select) request */ @@ -341,42 +397,39 @@ txn_write_to_wal(struct txn *txn) *local_row++ = stmt->row; else *remote_row++ = stmt->row; - req->approx_len += xrow_approx_len(stmt->row); + txn->entry->approx_len += xrow_approx_len(stmt->row); } - assert(remote_row == req->rows + txn->n_applier_rows); + assert(remote_row == txn->entry->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); - ev_tstamp start = ev_monotonic_now(loop()); - int64_t res = journal_write(req); - ev_tstamp stop = ev_monotonic_now(loop()); - - if (res < 0) { - /* Cascading rollback. */ - txn_rollback(txn); /* Perform our part of cascading rollback. */ - /* - * Move fiber to end of event loop to avoid - * execution of any new requests before all - * pending rollbacks are processed. - */ - fiber_reschedule(); + /* Send entry to a journal. */ + if (journal_write(txn->entry) < 0) { diag_set(ClientError, ER_WAL_IO); - diag_log(); - } else if (stop - start > too_long_threshold) { - int n_rows = txn->n_new_rows + txn->n_applier_rows; - say_warn_ratelimited("too long WAL write: %d rows at " - "LSN %lld: %.3f sec", n_rows, - res - n_rows + 1, stop - start); + return -1; } - /* - * Use vclock_sum() from WAL writer as transaction signature. - */ - return res; + return 0; } -int -txn_commit(struct txn *txn) +/* + * Wait until journal processing finished. + */ +static int64_t +txn_journal_wait(struct txn *txn) +{ + assert(txn->entry != NULL); + int64_t signature = journal_wait(txn->entry); + assert(signature == txn->signature); + if (signature < 0) + diag_set(ClientError, ER_WAL_IO); + return signature; +} + +/* + * Prepare a transaction using engines. + */ +static int +txn_prepare(struct txn *txn) { - assert(txn == in_txn()); /* * If transaction has been started in SQL, deferred * foreign key constraints must not be violated. @@ -386,7 +439,7 @@ txn_commit(struct txn *txn) struct sql_txn *sql_txn = txn->psql_txn; if (sql_txn->fk_deferred_count != 0) { diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT); - goto fail; + return -1; } } /* @@ -394,42 +447,75 @@ txn_commit(struct txn *txn) * we have a bunch of IPROTO_NOP statements. */ if (txn->engine != NULL) { - if (engine_prepare(txn->engine, txn) != 0) - goto fail; + if (engine_prepare(txn->engine, txn) != 0) { + return -1; + } } + return 0; +} - if (txn->n_new_rows + txn->n_applier_rows > 0) { - txn->signature = txn_write_to_wal(txn); - if (txn->signature < 0) - return -1; +/* + * Send a transaction to a journal. + */ +int +txn_write(struct txn *txn) +{ + if (txn_prepare(txn) != 0) + goto fail; + + txn->start_tm = ev_monotonic_now(loop()); + if (txn->n_new_rows + txn->n_applier_rows == 0) { + /* Nothing to do. */ + txn->signature = 0; + txn_complete(txn); + return 0; } - /* - * Engine can be NULL if transaction contains IPROTO_NOP - * statements only. - */ - if (txn->engine != NULL) - engine_commit(txn->engine, txn); - /* - * The transaction is in the binary log. No action below - * may throw. In case an error has happened, there is - * no other option but terminate. - */ - if (txn->has_triggers && - trigger_run(&txn->on_commit, txn) != 0) { - diag_log(); - unreachable(); - panic("commit trigger failed"); + + if (txn_journal_write(txn) != 0) + goto fail; + fiber_set_txn(fiber(), NULL); + return 0; +fail: + txn_rollback(txn); + return -1; +} + +/* + * Wait until transaction processing was finished. + */ +int +txn_wait(struct txn *txn) +{ + if (txn->n_new_rows + txn->n_applier_rows > 0 && + txn_journal_wait(txn) < 0) + goto fail; + ev_tstamp stop_tm = ev_monotonic_now(loop()); + if (stop_tm - txn->start_tm > too_long_threshold) { + int n_rows = txn->n_new_rows + txn->n_applier_rows; + say_warn_ratelimited("too long WAL write: %d rows at " + "LSN %lld: %.3f sec", n_rows, + txn->signature - n_rows + 1, + stop_tm - txn->start_tm); } - fiber_set_txn(fiber(), NULL); txn_free(txn); return 0; + fail: - txn_rollback(txn); + txn_free(txn); return -1; } +int +txn_commit(struct txn *txn) +{ + if (txn_write(txn) != 0 || + txn_wait(txn) < 0) + return -1; + return 0; +} + void txn_rollback_stmt(struct txn *txn) { @@ -442,18 +528,8 @@ txn_rollback_stmt(struct txn *txn) void txn_rollback(struct txn *txn) { - assert(txn == in_txn()); - if (txn->engine) - engine_rollback(txn->engine, txn); - /* Rollback triggers must not throw. */ - if (txn->has_triggers && - trigger_run(&txn->on_rollback, txn) != 0) { - diag_log(); - unreachable(); - panic("rollback trigger failed"); - } - - fiber_set_txn(fiber(), NULL); + txn->signature = -1; + txn_complete(txn); txn_free(txn); } diff --git a/src/box/txn.h b/src/box/txn.h index d211e5012..84d4f27d3 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -195,6 +195,12 @@ struct txn { /** Commit and rollback triggers */ struct rlist on_commit, on_rollback; struct sql_txn *psql_txn; + /** Journal entry to control txn write. */ + struct journal_entry *entry; + /** Transaction completion trigger. */ + struct trigger entry_done; + /** Timestampt of entry write start. */ + ev_tstamp start_tm; }; /* Pointer to the current transaction (if any) */ @@ -228,6 +234,12 @@ txn_commit(struct txn *txn); void txn_rollback(struct txn *txn); +int +txn_write(struct txn *txn); + +int +txn_wait(struct txn *txn); + /** * Roll back the transaction but keep the object around. * A special case for memtx transaction abort on yield. In this diff --git a/src/box/vy_log.c b/src/box/vy_log.c index bdc1cfa31..d37d44011 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -772,7 +772,7 @@ vy_log_flush(void) tx_size++; size_t used = region_used(&fiber()->gc); - struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc); + struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL); if (entry == NULL) goto err; diff --git a/src/box/wal.c b/src/box/wal.c index e868a8e71..eff48b4fe 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -272,6 +272,8 @@ tx_schedule_f(va_list ap) struct journal_entry *req = stailq_shift_entry(&writer->schedule_queue, struct journal_entry, fifo); + if (req->on_done_cb != NULL) + req->on_done_cb(req, req->on_done_cb_data); req->done = true; fiber_cond_broadcast(&req->done_cond); } @@ -1184,6 +1186,8 @@ wal_async_write(struct journal *journal, struct journal_entry *entry) say_error("Aborting transaction %llu during " "cascading rollback", vclock_sum(&writer->vclock)); + if (entry->on_done_cb) + entry->on_done_cb(entry, entry->on_done_cb_data); entry->done = true; fiber_cond_broadcast(&entry->done_cond); return -1; @@ -1200,6 +1204,8 @@ wal_async_write(struct journal *journal, struct journal_entry *entry) if (batch == NULL) { diag_set(OutOfMemory, sizeof(struct wal_msg), "region", "struct wal_msg"); + if (entry->on_done_cb) + entry->on_done_cb(entry, entry->on_done_cb_data); entry->done = true; fiber_cond_broadcast(&entry->done_cond); return -1; @@ -1253,6 +1259,8 @@ wal_async_write_in_wal_mode_none(struct journal *journal, vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); + if (entry->on_done_cb) + entry->on_done_cb(entry, entry->on_done_cb_data); entry->done = true; fiber_cond_broadcast(&entry->done_cond); return 0; -- 2.21.0