[tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail

Georgy Kirichenko georgy at tarantool.org
Sun Mar 10 23:21:27 MSK 2019


Form a separate transaction with local changes in case of replication.
This is important because we should be able to replicate such changes
(e.g. made within an on_replace trigger) back. In the opposite case
local changes will be incorporated into originating transaction and
would be skipped by the originator replica.

Needed for: #2798
---
 src/box/txn.c | 42 +++++++++++++++++++++++++++++-------------
 src/box/txn.h |  6 ++++--
 2 files changed, 33 insertions(+), 15 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 1f488bbcc..d3d008c25 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -118,9 +118,13 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
 	stailq_foreach_entry(stmt, &rollback, next) {
 		if (txn->engine != NULL && stmt->space != NULL)
 			engine_rollback_statement(txn->engine, txn, stmt);
-		if (stmt->row != NULL) {
-			assert(txn->n_rows > 0);
-			txn->n_rows--;
+		if (stmt->row != NULL && stmt->row->replica_id == 0) {
+			assert(txn->n_local_rows > 0);
+			txn->n_local_rows--;
+		}
+		if (stmt->row != NULL && stmt->row->replica_id != 0) {
+			assert(txn->n_remote_rows > 0);
+			txn->n_remote_rows--;
 		}
 		txn_stmt_unref_tuples(stmt);
 		stmt->space = NULL;
@@ -140,7 +144,8 @@ txn_begin(bool is_autocommit)
 	}
 	/* Initialize members explicitly to save time on memset() */
 	stailq_create(&txn->stmts);
-	txn->n_rows = 0;
+	txn->n_local_rows = 0;
+	txn->n_remote_rows = 0;
 	txn->is_autocommit = is_autocommit;
 	txn->has_triggers  = false;
 	txn->is_aborted = false;
@@ -233,7 +238,11 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
 		if (txn_add_redo(stmt, request) != 0)
 			goto fail;
-		++txn->n_rows;
+		assert(stmt->row != NULL);
+		if (stmt->row->replica_id == 0)
+			++txn->n_local_rows;
+		else
+			++txn->n_remote_rows;
 	}
 	/*
 	 * If there are triggers, and they are not disabled, and
@@ -264,21 +273,27 @@ fail:
 static int64_t
 txn_write_to_wal(struct txn *txn)
 {
-	assert(txn->n_rows > 0);
+	assert(txn->n_local_rows + txn->n_remote_rows > 0);
 
-	struct journal_entry *req = journal_entry_new(txn->n_rows);
+	struct journal_entry *req = journal_entry_new(txn->n_local_rows +
+						      txn->n_remote_rows);
 	if (req == NULL)
 		return -1;
 
 	struct txn_stmt *stmt;
-	struct xrow_header **row = req->rows;
+	struct xrow_header **remote_row = req->rows;
+	struct xrow_header **local_row = req->rows + txn->n_remote_rows;
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
-		*row++ = stmt->row;
+		if (stmt->row->replica_id == 0)
+			*local_row++ = stmt->row;
+		else
+			*remote_row++ = stmt->row;
 		req->approx_len += xrow_approx_len(stmt->row);
 	}
-	assert(row == req->rows + req->n_rows);
+	assert(remote_row == req->rows + txn->n_remote_rows);
+	assert(local_row == remote_row + txn->n_local_rows);
 
 	ev_tstamp start = ev_monotonic_now(loop());
 	int64_t res = journal_write(req);
@@ -296,9 +311,10 @@ txn_write_to_wal(struct txn *txn)
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
 	} else if (stop - start > too_long_threshold) {
+		int n_rows = txn->n_local_rows + txn->n_remote_rows;
 		say_warn_ratelimited("too long WAL write: %d rows at "
-				     "LSN %lld: %.3f sec", txn->n_rows,
-				     res - txn->n_rows + 1, stop - start);
+				     "LSN %lld: %.3f sec", n_rows,
+				     res - n_rows + 1, stop - start);
 	}
 	/*
 	 * Use vclock_sum() from WAL writer as transaction signature.
@@ -331,7 +347,7 @@ txn_commit(struct txn *txn)
 			goto fail;
 	}
 
-	if (txn->n_rows > 0) {
+	if (txn->n_local_rows + txn->n_remote_rows > 0) {
 		txn->signature = txn_write_to_wal(txn);
 		if (txn->signature < 0)
 			goto fail;
diff --git a/src/box/txn.h b/src/box/txn.h
index de5cb0de4..c9829da9e 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -140,8 +140,10 @@ struct txn {
 	int64_t id;
 	/** List of statements in a transaction. */
 	struct stailq stmts;
-	/** Total number of WAL rows in this txn. */
-	int n_rows;
+	/** Number of new rows without an assigned lsn. */
+	int n_local_rows;
+	/** Number of rows with an already assigned lsn. */
+	int n_remote_rows;
 	/**
 	 * True if this transaction is running in autocommit mode
 	 * (statement end causes an automatic transaction commit).
-- 
2.21.0





More information about the Tarantool-patches mailing list