From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 3C85E469710 for ; Tue, 19 May 2020 15:31:32 +0300 (MSK) References: <5428970e5b1574f456870fd7dd69ef67b46383c5.1589804001.git.sergepetrenko@tarantool.org> <1b20e028-c6a8-a717-adfb-1737af36cf65@tarantool.org> <20200519102303.GC121099@grain> <20200519111802.GA193176@grain> From: Serge Petrenko Message-ID: <72f99bed-4e42-9ced-6fe8-db3f9d0f72d5@tarantool.org> Date: Tue, 19 May 2020 15:31:30 +0300 MIME-Version: 1.0 In-Reply-To: <20200519111802.GA193176@grain> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: ru Subject: Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Cyrill Gorcunov Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org 19.05.2020 14:18, Cyrill Gorcunov пишет: > On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote: > ... >>>> +    if (packet->is_commit) { >>>> +write:        relay_send_tx(relay, &tx_rows); >>>> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); >>> Why?! This is stack local variable, no? >> Why? It's static. > Yup, just found out ;) It is so hidden in the other vars > declaration so was not obvious (a least for me :-). I'm > appending the more clear approach. > > But Serge, this is my personal opinion, I'm fine with your > current code as well, so > > Reviewed-by: Cyrill Gorcunov > > it is up to you to pick or drop the diff below Thanks! Looks good, applied. > --- > src/box/relay.cc | 37 ++++++++++++++++++++++--------------- > 1 file changed, 22 insertions(+), 15 deletions(-) > > diff --git a/src/box/relay.cc b/src/box/relay.cc > index 17b7bc667..27424e0ca 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -760,12 +760,12 @@ struct relay_tx_row { > }; > > static void > -relay_send_tx(struct relay *relay, struct rlist *tx_rows) > +relay_send_tx(struct relay *relay, struct rlist *head) > { > ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); > > struct relay_tx_row *tx_row; > - rlist_foreach_entry(tx_row, tx_rows, link) { > + rlist_foreach_entry(tx_row, head, link) { > tx_row->row.sync = relay->sync; > coio_write_xrow(&relay->io, &tx_row->row); > } > @@ -784,10 +784,13 @@ static void > relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > { > struct relay *relay = container_of(stream, struct relay, stream); > - static RLIST_HEAD(tx_rows); > - struct errinj *inj; > struct relay_tx_row *tx_row; > + struct errinj *inj; > + > + static RLIST_HEAD(tx_rows_list); > + > assert(iproto_type_is_dml(packet->type)); > + > if (packet->group_id == GROUP_LOCAL) { > /* > * We do not relay replica-local rows to other > @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > * in case the transaction is not fully > * local. > */ > - if (packet->is_commit && !rlist_empty(&tx_rows)) { > - rlist_last_entry(&tx_rows, struct relay_tx_row, > - link)->row.is_commit = true; > - goto write; > - } > - return; > + if (!packet->is_commit || rlist_empty(&tx_rows_list)) > + return; > + > + rlist_last_entry(&tx_rows_list, struct relay_tx_row, > + link)->row.is_commit = true; > + goto write; > } > packet->type = IPROTO_NOP; > packet->group_id = GROUP_DEFAULT; > packet->bodycnt = 0; > } > + > /* Check if the rows from the instance are filtered. */ > - if ((1 << packet->replica_id & relay->id_filter) != 0) > + if (((1u << packet->replica_id) & relay->id_filter) != 0) > return; > + > /* > * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE > * request. If this is FINAL JOIN (i.e. relay->replica is NULL), > @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > } > > /* A short path for single-statement transactions. */ > - if (packet->is_commit && rlist_empty(&tx_rows)) { > + if (packet->is_commit && rlist_empty(&tx_rows_list)) { > relay_send(relay, packet); > return; > } > @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > "struct relay_tx_row"); > } > tx_row->row = *packet; > - rlist_add_tail_entry(&tx_rows, tx_row, link); > + rlist_add_tail_entry(&tx_rows_list, tx_row, link); > > if (packet->is_commit) { > -write: relay_send_tx(relay, &tx_rows); > - tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); > +write: > + relay_send_tx(relay, &tx_rows_list); > + tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list); > + > /* > * Free all the relay_tx_rows allocated on the > * fiber region. -- Serge Petrenko