From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lf1-f67.google.com (mail-lf1-f67.google.com [209.85.167.67]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 05F60469710 for ; Tue, 19 May 2020 14:18:05 +0300 (MSK) Received: by mail-lf1-f67.google.com with SMTP id r125so544603lff.13 for ; Tue, 19 May 2020 04:18:05 -0700 (PDT) Date: Tue, 19 May 2020 14:18:02 +0300 From: Cyrill Gorcunov Message-ID: <20200519111802.GA193176@grain> References: <5428970e5b1574f456870fd7dd69ef67b46383c5.1589804001.git.sergepetrenko@tarantool.org> <1b20e028-c6a8-a717-adfb-1737af36cf65@tarantool.org> <20200519102303.GC121099@grain> MIME-Version: 1.0 Content-Type: text/plain; charset="iso-8859-1" Content-Disposition: inline Content-Transfer-Encoding: quoted-printable In-Reply-To: Subject: Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Serge Petrenko Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote: ... > > > +=A0=A0=A0 if (packet->is_commit) { > > > +write:=A0=A0=A0 =A0=A0=A0 relay_send_tx(relay, &tx_rows); > > > +=A0=A0=A0 =A0=A0=A0 tx_rows =3D RLIST_HEAD_INITIALIZER(tx_rows); > > Why?! This is stack local variable, no? > Why? It's static. Yup, just found out ;) It is so hidden in the other vars declaration so was not obvious (a least for me :-). I'm appending the more clear approach. But Serge, this is my personal opinion, I'm fine with your current code as well, so Reviewed-by: Cyrill Gorcunov it is up to you to pick or drop the diff below --- src/box/relay.cc | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index 17b7bc667..27424e0ca 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -760,12 +760,12 @@ struct relay_tx_row { }; =20 static void -relay_send_tx(struct relay *relay, struct rlist *tx_rows) +relay_send_tx(struct relay *relay, struct rlist *head) { ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); =20 struct relay_tx_row *tx_row; - rlist_foreach_entry(tx_row, tx_rows, link) { + rlist_foreach_entry(tx_row, head, link) { tx_row->row.sync =3D relay->sync; coio_write_xrow(&relay->io, &tx_row->row); } @@ -784,10 +784,13 @@ static void relay_collect_tx(struct xstream *stream, struct xrow_header *packet) { struct relay *relay =3D container_of(stream, struct relay, stream); - static RLIST_HEAD(tx_rows); - struct errinj *inj; struct relay_tx_row *tx_row; + struct errinj *inj; + + static RLIST_HEAD(tx_rows_list); + assert(iproto_type_is_dml(packet->type)); + if (packet->group_id =3D=3D GROUP_LOCAL) { /* * We do not relay replica-local rows to other @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct xrow_= header *packet) * in case the transaction is not fully * local. */ - if (packet->is_commit && !rlist_empty(&tx_rows)) { - rlist_last_entry(&tx_rows, struct relay_tx_row, - link)->row.is_commit =3D true; - goto write; - } - return; + if (!packet->is_commit || rlist_empty(&tx_rows_list)) + return; + + rlist_last_entry(&tx_rows_list, struct relay_tx_row, + link)->row.is_commit =3D true; + goto write; } packet->type =3D IPROTO_NOP; packet->group_id =3D GROUP_DEFAULT; packet->bodycnt =3D 0; } + /* Check if the rows from the instance are filtered. */ - if ((1 << packet->replica_id & relay->id_filter) !=3D 0) + if (((1u << packet->replica_id) & relay->id_filter) !=3D 0) return; + /* * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE * request. If this is FINAL JOIN (i.e. relay->replica is NULL), @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct xrow_he= ader *packet) } =20 /* A short path for single-statement transactions. */ - if (packet->is_commit && rlist_empty(&tx_rows)) { + if (packet->is_commit && rlist_empty(&tx_rows_list)) { relay_send(relay, packet); return; } @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct xrow_= header *packet) "struct relay_tx_row"); } tx_row->row =3D *packet; - rlist_add_tail_entry(&tx_rows, tx_row, link); + rlist_add_tail_entry(&tx_rows_list, tx_row, link); =20 if (packet->is_commit) { -write: relay_send_tx(relay, &tx_rows); - tx_rows =3D RLIST_HEAD_INITIALIZER(tx_rows); +write: + relay_send_tx(relay, &tx_rows_list); + tx_rows_list =3D RLIST_HEAD_INITIALIZER(tx_rows_list); + /* * Free all the relay_tx_rows allocated on the * fiber region. --=20 2.26.2