[Tarantool-patches] [PATCH v2 2/7] applier: extract tx boundary checks from applier_read_tx into a separate routine
Serge Petrenko
sergepetrenko at tarantool.org
Wed Mar 24 15:24:12 MSK 2021
Introduce a new routine, set_next_tx_row(), which checks tx boundary
violation and appends the new row to the current tx in case everything
is ok.
set_next_tx_row() is extracted from applier_read_tx() because it's a
common part of transaction assembly both for recovery and applier.
The only difference for recovery will be that the routine which's
responsible for tx assembly won't read rows. It'll be a callback ran on
each new row being read from WAL.
Prerequisite #5874
Part-of #5566
---
src/box/applier.cc | 117 +++++++++++++++++++++++----------------------
1 file changed, 60 insertions(+), 57 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 326cf18d2..65afa5e98 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -657,6 +657,64 @@ applier_read_tx_row(struct applier *applier)
return tx_row;
}
+static inline int64_t
+set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
+{
+ struct xrow_header *row = &tx_row->row;
+
+ if (iproto_type_is_error(row->type))
+ xrow_decode_error_xc(row);
+
+ /* Replication request. */
+ if (row->replica_id >= VCLOCK_MAX) {
+ /*
+ * A safety net, this can only occur if we're fed a strangely
+ * broken xlog. row->replica_id == 0, when reading heartbeats
+ * from an anonymous instance.
+ */
+ tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
+ int2str(row->replica_id),
+ tt_uuid_str(&REPLICASET_UUID));
+ }
+ if (tsn == 0) {
+ /*
+ * Transaction id must be derived from the log sequence number
+ * of the first row in the transaction.
+ */
+ tsn = row->tsn;
+ if (row->lsn != tsn)
+ tnt_raise(ClientError, ER_PROTOCOL,
+ "Transaction id must be equal to LSN of the "
+ "first row in the transaction.");
+ } else if (tsn != row->tsn) {
+ tnt_raise(ClientError, ER_UNSUPPORTED, "replication",
+ "interleaving transactions");
+ }
+
+ assert(row->bodycnt <= 1);
+ if (row->is_commit) {
+ /* Signal the caller that we've reached the tx end. */
+ tsn = 0;
+ } else if (row->bodycnt == 1) {
+ /*
+ * Save row body to gc region. Not done for single-statement
+ * transactions and the last row of multi-statement transactions
+ * knowing that the input buffer will not be used while the
+ * transaction is applied.
+ */
+ void *new_base = region_alloc(&fiber()->gc, row->body->iov_len);
+ if (new_base == NULL)
+ tnt_raise(OutOfMemory, row->body->iov_len, "region",
+ "xrow body");
+ memcpy(new_base, row->body->iov_base, row->body->iov_len);
+ /* Adjust row body pointers. */
+ row->body->iov_base = new_base;
+ }
+
+ stailq_add_tail(rows, &tx_row->next);
+ return tsn;
+}
+
/**
* Read one transaction from network using applier's input buffer.
* Transaction rows are placed onto fiber gc region.
@@ -672,63 +730,8 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
stailq_create(rows);
do {
struct applier_tx_row *tx_row = applier_read_tx_row(applier);
- struct xrow_header *row = &tx_row->row;
-
- if (iproto_type_is_error(row->type))
- xrow_decode_error_xc(row);
-
- /* Replication request. */
- if (row->replica_id >= VCLOCK_MAX) {
- /*
- * A safety net, this can only occur
- * if we're fed a strangely broken xlog.
- * row->replica_id == 0, when reading
- * heartbeats from an anonymous instance.
- */
- tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
- int2str(row->replica_id),
- tt_uuid_str(&REPLICASET_UUID));
- }
- if (tsn == 0) {
- /*
- * Transaction id must be derived from the log sequence
- * number of the first row in the transaction.
- */
- tsn = row->tsn;
- if (row->lsn != tsn)
- tnt_raise(ClientError, ER_PROTOCOL,
- "Transaction id must be equal to "
- "LSN of the first row in the "
- "transaction.");
- }
- if (tsn != row->tsn)
- tnt_raise(ClientError, ER_UNSUPPORTED,
- "replication",
- "interleaving transactions");
-
- assert(row->bodycnt <= 1);
- if (row->bodycnt == 1 && !row->is_commit) {
- /*
- * Save row body to gc region.
- * Not done for single-statement
- * transactions knowing that the input
- * buffer will not be used while the
- * transaction is applied.
- */
- void *new_base = region_alloc(&fiber()->gc,
- row->body->iov_len);
- if (new_base == NULL)
- tnt_raise(OutOfMemory, row->body->iov_len,
- "region", "xrow body");
- memcpy(new_base, row->body->iov_base,
- row->body->iov_len);
- /* Adjust row body pointers. */
- row->body->iov_base = new_base;
- }
- stailq_add_tail(rows, &tx_row->next);
-
- } while (!stailq_last_entry(rows, struct applier_tx_row,
- next)->row.is_commit);
+ tsn = set_next_tx_row(rows, tx_row, tsn);
+ } while (tsn != 0);
}
static void
--
2.24.3 (Apple Git-128)
More information about the Tarantool-patches
mailing list