[Tarantool-patches] [PATCH v2 2/7] applier: extract tx boundary checks from applier_read_tx into a separate routine

Serge Petrenko sergepetrenko at tarantool.org
Wed Mar 24 15:24:12 MSK 2021


Introduce a new routine, set_next_tx_row(), which checks tx boundary
violation and appends the new row to the current tx in case everything
is ok.

set_next_tx_row() is extracted from applier_read_tx() because it's a
common part of transaction assembly both for recovery and applier.

The only difference for recovery will be that the routine which's
responsible for tx assembly won't read rows. It'll be a callback ran on
each new row being read from WAL.

Prerequisite #5874
Part-of #5566
---
 src/box/applier.cc | 117 +++++++++++++++++++++++----------------------
 1 file changed, 60 insertions(+), 57 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 326cf18d2..65afa5e98 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -657,6 +657,64 @@ applier_read_tx_row(struct applier *applier)
 	return tx_row;
 }
 
+static inline int64_t
+set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
+{
+	struct xrow_header *row = &tx_row->row;
+
+	if (iproto_type_is_error(row->type))
+		xrow_decode_error_xc(row);
+
+	/* Replication request. */
+	if (row->replica_id >= VCLOCK_MAX) {
+		/*
+		 * A safety net, this can only occur if we're fed a strangely
+		 * broken xlog. row->replica_id == 0, when reading heartbeats
+		 * from an anonymous instance.
+		 */
+		tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
+			  int2str(row->replica_id),
+			  tt_uuid_str(&REPLICASET_UUID));
+	}
+	if (tsn == 0) {
+		/*
+		 * Transaction id must be derived from the log sequence number
+		 * of the first row in the transaction.
+		 */
+		tsn = row->tsn;
+		if (row->lsn != tsn)
+			tnt_raise(ClientError, ER_PROTOCOL,
+				  "Transaction id must be equal to LSN of the "
+				  "first row in the transaction.");
+	} else if (tsn != row->tsn) {
+		tnt_raise(ClientError, ER_UNSUPPORTED, "replication",
+			  "interleaving transactions");
+	}
+
+	assert(row->bodycnt <= 1);
+	if (row->is_commit) {
+		/* Signal the caller that we've reached the tx end. */
+		tsn = 0;
+	} else if (row->bodycnt == 1) {
+		/*
+		 * Save row body to gc region. Not done for single-statement
+		 * transactions and the last row of multi-statement transactions
+		 * knowing that the input buffer will not be used while the
+		 * transaction is applied.
+		 */
+		void *new_base = region_alloc(&fiber()->gc, row->body->iov_len);
+		if (new_base == NULL)
+			tnt_raise(OutOfMemory, row->body->iov_len, "region",
+				  "xrow body");
+		memcpy(new_base, row->body->iov_base, row->body->iov_len);
+		/* Adjust row body pointers. */
+		row->body->iov_base = new_base;
+	}
+
+	stailq_add_tail(rows, &tx_row->next);
+	return tsn;
+}
+
 /**
  * Read one transaction from network using applier's input buffer.
  * Transaction rows are placed onto fiber gc region.
@@ -672,63 +730,8 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 	stailq_create(rows);
 	do {
 		struct applier_tx_row *tx_row = applier_read_tx_row(applier);
-		struct xrow_header *row = &tx_row->row;
-
-		if (iproto_type_is_error(row->type))
-			xrow_decode_error_xc(row);
-
-		/* Replication request. */
-		if (row->replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 * row->replica_id == 0, when reading
-			 * heartbeats from an anonymous instance.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row->replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
-		if (tsn == 0) {
-			/*
-			 * Transaction id must be derived from the log sequence
-			 * number of the first row in the transaction.
-			 */
-			tsn = row->tsn;
-			if (row->lsn != tsn)
-				tnt_raise(ClientError, ER_PROTOCOL,
-					  "Transaction id must be equal to "
-					  "LSN of the first row in the "
-					  "transaction.");
-		}
-		if (tsn != row->tsn)
-			tnt_raise(ClientError, ER_UNSUPPORTED,
-				  "replication",
-				  "interleaving transactions");
-
-		assert(row->bodycnt <= 1);
-		if (row->bodycnt == 1 && !row->is_commit) {
-			/*
-			 * Save row body to gc region.
-			 * Not done for single-statement
-			 * transactions knowing that the input
-			 * buffer will not be used while the
-			 * transaction is applied.
-			 */
-			void *new_base = region_alloc(&fiber()->gc,
-						      row->body->iov_len);
-			if (new_base == NULL)
-				tnt_raise(OutOfMemory, row->body->iov_len,
-					  "region", "xrow body");
-			memcpy(new_base, row->body->iov_base,
-			       row->body->iov_len);
-			/* Adjust row body pointers. */
-			row->body->iov_base = new_base;
-		}
-		stailq_add_tail(rows, &tx_row->next);
-
-	} while (!stailq_last_entry(rows, struct applier_tx_row,
-				    next)->row.is_commit);
+		tsn = set_next_tx_row(rows, tx_row, tsn);
+	} while (tsn != 0);
 }
 
 static void
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list