[tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail
Georgy Kirichenko
georgy at tarantool.org
Sun Mar 10 23:21:27 MSK 2019
Form a separate transaction with local changes in case of replication.
This is important because we should be able to replicate such changes
(e.g. made within an on_replace trigger) back. In the opposite case
local changes will be incorporated into originating transaction and
would be skipped by the originator replica.
Needed for: #2798
---
src/box/txn.c | 42 +++++++++++++++++++++++++++++-------------
src/box/txn.h | 6 ++++--
2 files changed, 33 insertions(+), 15 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 1f488bbcc..d3d008c25 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -118,9 +118,13 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
stailq_foreach_entry(stmt, &rollback, next) {
if (txn->engine != NULL && stmt->space != NULL)
engine_rollback_statement(txn->engine, txn, stmt);
- if (stmt->row != NULL) {
- assert(txn->n_rows > 0);
- txn->n_rows--;
+ if (stmt->row != NULL && stmt->row->replica_id == 0) {
+ assert(txn->n_local_rows > 0);
+ txn->n_local_rows--;
+ }
+ if (stmt->row != NULL && stmt->row->replica_id != 0) {
+ assert(txn->n_remote_rows > 0);
+ txn->n_remote_rows--;
}
txn_stmt_unref_tuples(stmt);
stmt->space = NULL;
@@ -140,7 +144,8 @@ txn_begin(bool is_autocommit)
}
/* Initialize members explicitly to save time on memset() */
stailq_create(&txn->stmts);
- txn->n_rows = 0;
+ txn->n_local_rows = 0;
+ txn->n_remote_rows = 0;
txn->is_autocommit = is_autocommit;
txn->has_triggers = false;
txn->is_aborted = false;
@@ -233,7 +238,11 @@ txn_commit_stmt(struct txn *txn, struct request *request)
if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
if (txn_add_redo(stmt, request) != 0)
goto fail;
- ++txn->n_rows;
+ assert(stmt->row != NULL);
+ if (stmt->row->replica_id == 0)
+ ++txn->n_local_rows;
+ else
+ ++txn->n_remote_rows;
}
/*
* If there are triggers, and they are not disabled, and
@@ -264,21 +273,27 @@ fail:
static int64_t
txn_write_to_wal(struct txn *txn)
{
- assert(txn->n_rows > 0);
+ assert(txn->n_local_rows + txn->n_remote_rows > 0);
- struct journal_entry *req = journal_entry_new(txn->n_rows);
+ struct journal_entry *req = journal_entry_new(txn->n_local_rows +
+ txn->n_remote_rows);
if (req == NULL)
return -1;
struct txn_stmt *stmt;
- struct xrow_header **row = req->rows;
+ struct xrow_header **remote_row = req->rows;
+ struct xrow_header **local_row = req->rows + txn->n_remote_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
- *row++ = stmt->row;
+ if (stmt->row->replica_id == 0)
+ *local_row++ = stmt->row;
+ else
+ *remote_row++ = stmt->row;
req->approx_len += xrow_approx_len(stmt->row);
}
- assert(row == req->rows + req->n_rows);
+ assert(remote_row == req->rows + txn->n_remote_rows);
+ assert(local_row == remote_row + txn->n_local_rows);
ev_tstamp start = ev_monotonic_now(loop());
int64_t res = journal_write(req);
@@ -296,9 +311,10 @@ txn_write_to_wal(struct txn *txn)
diag_set(ClientError, ER_WAL_IO);
diag_log();
} else if (stop - start > too_long_threshold) {
+ int n_rows = txn->n_local_rows + txn->n_remote_rows;
say_warn_ratelimited("too long WAL write: %d rows at "
- "LSN %lld: %.3f sec", txn->n_rows,
- res - txn->n_rows + 1, stop - start);
+ "LSN %lld: %.3f sec", n_rows,
+ res - n_rows + 1, stop - start);
}
/*
* Use vclock_sum() from WAL writer as transaction signature.
@@ -331,7 +347,7 @@ txn_commit(struct txn *txn)
goto fail;
}
- if (txn->n_rows > 0) {
+ if (txn->n_local_rows + txn->n_remote_rows > 0) {
txn->signature = txn_write_to_wal(txn);
if (txn->signature < 0)
goto fail;
diff --git a/src/box/txn.h b/src/box/txn.h
index de5cb0de4..c9829da9e 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -140,8 +140,10 @@ struct txn {
int64_t id;
/** List of statements in a transaction. */
struct stailq stmts;
- /** Total number of WAL rows in this txn. */
- int n_rows;
+ /** Number of new rows without an assigned lsn. */
+ int n_local_rows;
+ /** Number of rows with an already assigned lsn. */
+ int n_remote_rows;
/**
* True if this transaction is running in autocommit mode
* (statement end causes an automatic transaction commit).
--
2.21.0
More information about the Tarantool-patches
mailing list