[Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches

Cyrill Gorcunov gorcunov at gmail.com
Tue May 19 14:18:02 MSK 2020


On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote:
...
> > > +    if (packet->is_commit) {
> > > +write:        relay_send_tx(relay, &tx_rows);
> > > +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
> > Why?! This is stack local variable, no?
> Why? It's static.

Yup, just found out ;) It is so hidden in the other vars
declaration so was not obvious (a least for me :-). I'm
appending the more clear approach.

But Serge, this is my personal opinion, I'm fine with your
current code as well, so

Reviewed-by: Cyrill Gorcunov <gorcunov at gmail.com>

it is up to you to pick or drop the diff below
---
 src/box/relay.cc | 37 ++++++++++++++++++++++---------------
 1 file changed, 22 insertions(+), 15 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 17b7bc667..27424e0ca 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -760,12 +760,12 @@ struct relay_tx_row {
 };
 
 static void
-relay_send_tx(struct relay *relay, struct rlist *tx_rows)
+relay_send_tx(struct relay *relay, struct rlist *head)
 {
 	ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
 
 	struct relay_tx_row *tx_row;
-	rlist_foreach_entry(tx_row, tx_rows, link) {
+	rlist_foreach_entry(tx_row, head, link) {
 		tx_row->row.sync = relay->sync;
 		coio_write_xrow(&relay->io, &tx_row->row);
 	}
@@ -784,10 +784,13 @@ static void
 relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	static RLIST_HEAD(tx_rows);
-	struct errinj *inj;
 	struct relay_tx_row *tx_row;
+	struct errinj *inj;
+
+	static RLIST_HEAD(tx_rows_list);
+
 	assert(iproto_type_is_dml(packet->type));
+
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
@@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 			 * in case the transaction is not fully
 			 * local.
 			 */
-			if (packet->is_commit && !rlist_empty(&tx_rows)) {
-				rlist_last_entry(&tx_rows, struct relay_tx_row,
-						 link)->row.is_commit = true;
-				goto write;
-			}
-			return;
+			if (!packet->is_commit || rlist_empty(&tx_rows_list))
+				return;
+
+			rlist_last_entry(&tx_rows_list, struct relay_tx_row,
+					 link)->row.is_commit = true;
+			goto write;
 		}
 		packet->type = IPROTO_NOP;
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+
 	/* Check if the rows from the instance are filtered. */
-	if ((1 << packet->replica_id & relay->id_filter) != 0)
+	if (((1u << packet->replica_id) & relay->id_filter) != 0)
 		return;
+
 	/*
 	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
 	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
@@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 	}
 
 	/* A short path for single-statement transactions. */
-	if (packet->is_commit && rlist_empty(&tx_rows)) {
+	if (packet->is_commit && rlist_empty(&tx_rows_list)) {
 		relay_send(relay, packet);
 		return;
 	}
@@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 			  "struct relay_tx_row");
 	}
 	tx_row->row = *packet;
-	rlist_add_tail_entry(&tx_rows, tx_row, link);
+	rlist_add_tail_entry(&tx_rows_list, tx_row, link);
 
 	if (packet->is_commit) {
-write:		relay_send_tx(relay, &tx_rows);
-		tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
+write:
+		relay_send_tx(relay, &tx_rows_list);
+		tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list);
+
 		/*
 		 * Free all the relay_tx_rows allocated on the
 		 * fiber region.
-- 
2.26.2



More information about the Tarantool-patches mailing list