From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 85F0F213DA for ; Sun, 10 Mar 2019 16:21:34 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id cUyTyBuGrmHf for ; Sun, 10 Mar 2019 16:21:34 -0400 (EDT) Received: from smtp40.i.mail.ru (smtp40.i.mail.ru [94.100.177.100]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 40A05213B5 for ; Sun, 10 Mar 2019 16:21:34 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail Date: Sun, 10 Mar 2019 23:21:27 +0300 Message-Id: <48e024c9ada966ce68447a7cf24c201b1ebaf27b.1552248901.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Form a separate transaction with local changes in case of replication. This is important because we should be able to replicate such changes (e.g. made within an on_replace trigger) back. In the opposite case local changes will be incorporated into originating transaction and would be skipped by the originator replica. Needed for: #2798 --- src/box/txn.c | 42 +++++++++++++++++++++++++++++------------- src/box/txn.h | 6 ++++-- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 1f488bbcc..d3d008c25 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -118,9 +118,13 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) stailq_foreach_entry(stmt, &rollback, next) { if (txn->engine != NULL && stmt->space != NULL) engine_rollback_statement(txn->engine, txn, stmt); - if (stmt->row != NULL) { - assert(txn->n_rows > 0); - txn->n_rows--; + if (stmt->row != NULL && stmt->row->replica_id == 0) { + assert(txn->n_local_rows > 0); + txn->n_local_rows--; + } + if (stmt->row != NULL && stmt->row->replica_id != 0) { + assert(txn->n_remote_rows > 0); + txn->n_remote_rows--; } txn_stmt_unref_tuples(stmt); stmt->space = NULL; @@ -140,7 +144,8 @@ txn_begin(bool is_autocommit) } /* Initialize members explicitly to save time on memset() */ stailq_create(&txn->stmts); - txn->n_rows = 0; + txn->n_local_rows = 0; + txn->n_remote_rows = 0; txn->is_autocommit = is_autocommit; txn->has_triggers = false; txn->is_aborted = false; @@ -233,7 +238,11 @@ txn_commit_stmt(struct txn *txn, struct request *request) if (stmt->space == NULL || !space_is_temporary(stmt->space)) { if (txn_add_redo(stmt, request) != 0) goto fail; - ++txn->n_rows; + assert(stmt->row != NULL); + if (stmt->row->replica_id == 0) + ++txn->n_local_rows; + else + ++txn->n_remote_rows; } /* * If there are triggers, and they are not disabled, and @@ -264,21 +273,27 @@ fail: static int64_t txn_write_to_wal(struct txn *txn) { - assert(txn->n_rows > 0); + assert(txn->n_local_rows + txn->n_remote_rows > 0); - struct journal_entry *req = journal_entry_new(txn->n_rows); + struct journal_entry *req = journal_entry_new(txn->n_local_rows + + txn->n_remote_rows); if (req == NULL) return -1; struct txn_stmt *stmt; - struct xrow_header **row = req->rows; + struct xrow_header **remote_row = req->rows; + struct xrow_header **local_row = req->rows + txn->n_remote_rows; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) continue; /* A read (e.g. select) request */ - *row++ = stmt->row; + if (stmt->row->replica_id == 0) + *local_row++ = stmt->row; + else + *remote_row++ = stmt->row; req->approx_len += xrow_approx_len(stmt->row); } - assert(row == req->rows + req->n_rows); + assert(remote_row == req->rows + txn->n_remote_rows); + assert(local_row == remote_row + txn->n_local_rows); ev_tstamp start = ev_monotonic_now(loop()); int64_t res = journal_write(req); @@ -296,9 +311,10 @@ txn_write_to_wal(struct txn *txn) diag_set(ClientError, ER_WAL_IO); diag_log(); } else if (stop - start > too_long_threshold) { + int n_rows = txn->n_local_rows + txn->n_remote_rows; say_warn_ratelimited("too long WAL write: %d rows at " - "LSN %lld: %.3f sec", txn->n_rows, - res - txn->n_rows + 1, stop - start); + "LSN %lld: %.3f sec", n_rows, + res - n_rows + 1, stop - start); } /* * Use vclock_sum() from WAL writer as transaction signature. @@ -331,7 +347,7 @@ txn_commit(struct txn *txn) goto fail; } - if (txn->n_rows > 0) { + if (txn->n_local_rows + txn->n_remote_rows > 0) { txn->signature = txn_write_to_wal(txn); if (txn->signature < 0) goto fail; diff --git a/src/box/txn.h b/src/box/txn.h index de5cb0de4..c9829da9e 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -140,8 +140,10 @@ struct txn { int64_t id; /** List of statements in a transaction. */ struct stailq stmts; - /** Total number of WAL rows in this txn. */ - int n_rows; + /** Number of new rows without an assigned lsn. */ + int n_local_rows; + /** Number of rows with an already assigned lsn. */ + int n_remote_rows; /** * True if this transaction is running in autocommit mode * (statement end causes an automatic transaction commit). -- 2.21.0