From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp37.i.mail.ru (smtp37.i.mail.ru [94.100.177.97]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 8FF75469710 for ; Tue, 19 May 2020 19:24:47 +0300 (MSK) From: Serge Petrenko References: <5428970e5b1574f456870fd7dd69ef67b46383c5.1589804001.git.sergepetrenko@tarantool.org> <1b20e028-c6a8-a717-adfb-1737af36cf65@tarantool.org> <20200519102303.GC121099@grain> <20200519111802.GA193176@grain> <72f99bed-4e42-9ced-6fe8-db3f9d0f72d5@tarantool.org> Message-ID: <20f650d3-fcff-3117-c503-8db888cbadd3@tarantool.org> Date: Tue, 19 May 2020 19:24:46 +0300 MIME-Version: 1.0 In-Reply-To: <72f99bed-4e42-9ced-6fe8-db3f9d0f72d5@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Cyrill Gorcunov Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org 19.05.2020 15:31, Serge Petrenko пишет: > > 19.05.2020 14:18, Cyrill Gorcunov пишет: >> On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote: >> ... >>>>> +    if (packet->is_commit) { >>>>> +write:        relay_send_tx(relay, &tx_rows); >>>>> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); >>>> Why?! This is stack local variable, no? >>> Why? It's static. >> Yup, just found out ;) It is so hidden in the other vars >> declaration so was not obvious (a least for me :-). I'm >> appending the more clear approach. >> >> But Serge, this is my personal opinion, I'm fine with your >> current code as well, so >> >> Reviewed-by: Cyrill Gorcunov >> >> it is up to you to pick or drop the diff below > > Thanks! Looks good, applied. > >> --- >>   src/box/relay.cc | 37 ++++++++++++++++++++++--------------- >>   1 file changed, 22 insertions(+), 15 deletions(-) >> >> diff --git a/src/box/relay.cc b/src/box/relay.cc >> index 17b7bc667..27424e0ca 100644 >> --- a/src/box/relay.cc >> +++ b/src/box/relay.cc >> @@ -760,12 +760,12 @@ struct relay_tx_row { >>   }; >>     static void >> -relay_send_tx(struct relay *relay, struct rlist *tx_rows) >> +relay_send_tx(struct relay *relay, struct rlist *head) >>   { >>       ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); >>         struct relay_tx_row *tx_row; >> -    rlist_foreach_entry(tx_row, tx_rows, link) { >> +    rlist_foreach_entry(tx_row, head, link) { >>           tx_row->row.sync = relay->sync; >>           coio_write_xrow(&relay->io, &tx_row->row); >>       } >> @@ -784,10 +784,13 @@ static void >>   relay_collect_tx(struct xstream *stream, struct xrow_header *packet) >>   { >>       struct relay *relay = container_of(stream, struct relay, stream); >> -    static RLIST_HEAD(tx_rows); >> -    struct errinj *inj; >>       struct relay_tx_row *tx_row; >> +    struct errinj *inj; >> + >> +    static RLIST_HEAD(tx_rows_list); >> + >>       assert(iproto_type_is_dml(packet->type)); >> + >>       if (packet->group_id == GROUP_LOCAL) { >>           /* >>            * We do not relay replica-local rows to other >> @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct >> xrow_header *packet) >>                * in case the transaction is not fully >>                * local. >>                */ >> -            if (packet->is_commit && !rlist_empty(&tx_rows)) { >> -                rlist_last_entry(&tx_rows, struct relay_tx_row, >> -                         link)->row.is_commit = true; >> -                goto write; >> -            } >> -            return; >> +            if (!packet->is_commit || rlist_empty(&tx_rows_list)) >> +                return; >> + >> +            rlist_last_entry(&tx_rows_list, struct relay_tx_row, >> +                     link)->row.is_commit = true; >> +            goto write; >>           } >>           packet->type = IPROTO_NOP; >>           packet->group_id = GROUP_DEFAULT; >>           packet->bodycnt = 0; >>       } >> + >>       /* Check if the rows from the instance are filtered. */ >> -    if ((1 << packet->replica_id & relay->id_filter) != 0) >> +    if (((1u << packet->replica_id) & relay->id_filter) != 0) >>           return; >> + >>       /* >>        * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE >>        * request. If this is FINAL JOIN (i.e. relay->replica is NULL), >> @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct >> xrow_header *packet) >>       } >>         /* A short path for single-statement transactions. */ >> -    if (packet->is_commit && rlist_empty(&tx_rows)) { >> +    if (packet->is_commit && rlist_empty(&tx_rows_list)) { >>           relay_send(relay, packet); >>           return; >>       } >> @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct >> xrow_header *packet) >>                 "struct relay_tx_row"); >>       } >>       tx_row->row = *packet; >> -    rlist_add_tail_entry(&tx_rows, tx_row, link); >> +    rlist_add_tail_entry(&tx_rows_list, tx_row, link); >>         if (packet->is_commit) { >> -write:        relay_send_tx(relay, &tx_rows); >> -        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); >> +write: >> +        relay_send_tx(relay, &tx_rows_list); >> +        tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list); >> + >>           /* >>            * Free all the relay_tx_rows allocated on the >>            * fiber region. > This patch needs some more thought. I noticed test failures on GitLab: https://gitlab.com/tarantool/tarantool/-/jobs/559102582 Looks like the problem is with row bodies, which are stored in an ibuf owned by xlog_cursor. The cursor was designed to work in a row-by-row manner, just like the recovery system, which calls stream_write() on each recovered row. So the ibuf may be invalidated after recovery writes a single row (i.e. after a single call to relay_collect_tx()). Even though I copy the xrow_header itself, the header contains pointers to its body which is stored in the xlog_cursor's ibuf, and it gets rewritten as soon as ibuf_reserve_slow() is called. -- Serge Petrenko