Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko <sergepetrenko@tarantool.org>
To: Cyrill Gorcunov <gorcunov@gmail.com>
Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org
Subject: Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
Date: Tue, 19 May 2020 19:24:46 +0300	[thread overview]
Message-ID: <20f650d3-fcff-3117-c503-8db888cbadd3@tarantool.org> (raw)
In-Reply-To: <72f99bed-4e42-9ced-6fe8-db3f9d0f72d5@tarantool.org>


19.05.2020 15:31, Serge Petrenko пишет:
>
> 19.05.2020 14:18, Cyrill Gorcunov пишет:
>> On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote:
>> ...
>>>>> +    if (packet->is_commit) {
>>>>> +write:        relay_send_tx(relay, &tx_rows);
>>>>> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
>>>> Why?! This is stack local variable, no?
>>> Why? It's static.
>> Yup, just found out ;) It is so hidden in the other vars
>> declaration so was not obvious (a least for me :-). I'm
>> appending the more clear approach.
>>
>> But Serge, this is my personal opinion, I'm fine with your
>> current code as well, so
>>
>> Reviewed-by: Cyrill Gorcunov <gorcunov@gmail.com>
>>
>> it is up to you to pick or drop the diff below
>
> Thanks! Looks good, applied.
>
>> ---
>>   src/box/relay.cc | 37 ++++++++++++++++++++++---------------
>>   1 file changed, 22 insertions(+), 15 deletions(-)
>>
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 17b7bc667..27424e0ca 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -760,12 +760,12 @@ struct relay_tx_row {
>>   };
>>     static void
>> -relay_send_tx(struct relay *relay, struct rlist *tx_rows)
>> +relay_send_tx(struct relay *relay, struct rlist *head)
>>   {
>>       ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
>>         struct relay_tx_row *tx_row;
>> -    rlist_foreach_entry(tx_row, tx_rows, link) {
>> +    rlist_foreach_entry(tx_row, head, link) {
>>           tx_row->row.sync = relay->sync;
>>           coio_write_xrow(&relay->io, &tx_row->row);
>>       }
>> @@ -784,10 +784,13 @@ static void
>>   relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>>   {
>>       struct relay *relay = container_of(stream, struct relay, stream);
>> -    static RLIST_HEAD(tx_rows);
>> -    struct errinj *inj;
>>       struct relay_tx_row *tx_row;
>> +    struct errinj *inj;
>> +
>> +    static RLIST_HEAD(tx_rows_list);
>> +
>>       assert(iproto_type_is_dml(packet->type));
>> +
>>       if (packet->group_id == GROUP_LOCAL) {
>>           /*
>>            * We do not relay replica-local rows to other
>> @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct 
>> xrow_header *packet)
>>                * in case the transaction is not fully
>>                * local.
>>                */
>> -            if (packet->is_commit && !rlist_empty(&tx_rows)) {
>> -                rlist_last_entry(&tx_rows, struct relay_tx_row,
>> -                         link)->row.is_commit = true;
>> -                goto write;
>> -            }
>> -            return;
>> +            if (!packet->is_commit || rlist_empty(&tx_rows_list))
>> +                return;
>> +
>> +            rlist_last_entry(&tx_rows_list, struct relay_tx_row,
>> +                     link)->row.is_commit = true;
>> +            goto write;
>>           }
>>           packet->type = IPROTO_NOP;
>>           packet->group_id = GROUP_DEFAULT;
>>           packet->bodycnt = 0;
>>       }
>> +
>>       /* Check if the rows from the instance are filtered. */
>> -    if ((1 << packet->replica_id & relay->id_filter) != 0)
>> +    if (((1u << packet->replica_id) & relay->id_filter) != 0)
>>           return;
>> +
>>       /*
>>        * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
>>        * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
>> @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct 
>> xrow_header *packet)
>>       }
>>         /* A short path for single-statement transactions. */
>> -    if (packet->is_commit && rlist_empty(&tx_rows)) {
>> +    if (packet->is_commit && rlist_empty(&tx_rows_list)) {
>>           relay_send(relay, packet);
>>           return;
>>       }
>> @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct 
>> xrow_header *packet)
>>                 "struct relay_tx_row");
>>       }
>>       tx_row->row = *packet;
>> -    rlist_add_tail_entry(&tx_rows, tx_row, link);
>> +    rlist_add_tail_entry(&tx_rows_list, tx_row, link);
>>         if (packet->is_commit) {
>> -write:        relay_send_tx(relay, &tx_rows);
>> -        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
>> +write:
>> +        relay_send_tx(relay, &tx_rows_list);
>> +        tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list);
>> +
>>           /*
>>            * Free all the relay_tx_rows allocated on the
>>            * fiber region.
>
This patch needs some more thought. I noticed test failures on GitLab:

https://gitlab.com/tarantool/tarantool/-/jobs/559102582

Looks like the problem is with row bodies, which are stored in an ibuf

owned by xlog_cursor. The cursor was designed to work in a row-by-row

manner, just like the recovery system, which calls stream_write() on each

recovered row. So the ibuf may be invalidated after recovery writes a single

row (i.e. after a single call to relay_collect_tx()).

Even though I copy the xrow_header itself, the header contains pointers 
to its

body which is stored in the xlog_cursor's ibuf, and it gets rewritten as 
soon as

ibuf_reserve_slow() is called.

-- 
Serge Petrenko

      reply	other threads:[~2020-05-19 16:24 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-05-18 12:24 [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework Serge Petrenko
2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko
2020-05-19  9:08   ` Cyrill Gorcunov
2020-05-19 10:30     ` Serge Petrenko
2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko
2020-05-18 12:28   ` Serge Petrenko
2020-05-19 10:23     ` Cyrill Gorcunov
2020-05-19 10:49       ` Serge Petrenko
2020-05-19 11:18         ` Cyrill Gorcunov
2020-05-19 12:31           ` Serge Petrenko
2020-05-19 16:24             ` Serge Petrenko [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20f650d3-fcff-3117-c503-8db888cbadd3@tarantool.org \
    --to=sergepetrenko@tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=tarantool-patches@dev.tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox