Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko <sergepetrenko@tarantool.org>
To: Cyrill Gorcunov <gorcunov@gmail.com>
Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org
Subject: Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
Date: Tue, 19 May 2020 13:49:10 +0300	[thread overview]
Message-ID: <a321090e-3388-00e9-d849-2cbce6d05562@tarantool.org> (raw)
In-Reply-To: <20200519102303.GC121099@grain>


19.05.2020 13:23, Cyrill Gorcunov пишет:
> On Mon, May 18, 2020 at 03:28:46PM +0300, Serge Petrenko wrote:


Hi! Thanks for the review!

>> Sorry for the mess, the correct patch is below:
> space/tabs are screwed (but since we're usually fetching from
> the branch this is not critical)
>
> ...
Yep, that's cos I pasted it from the console, sorry.
>> -/** Send a single row to the client. */
>> +struct relay_tx_row {
>> +    struct rlist link;
>> +    struct xrow_header row;
>> +};
>> +
>>   static void
>> -relay_send_row(struct xstream *stream, struct xrow_header *packet)
>> +relay_send_tx(struct relay *relay, struct rlist *tx_rows)
>> +{
>> +    ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
>> +
>> +    struct relay_tx_row *tx_row;
>> +    rlist_foreach_entry(tx_row, tx_rows, link) {
>> +        tx_row->row.sync = relay->sync;
>> +        coio_write_xrow(&relay->io, &tx_row->row);
>> +    }
>> +    relay->last_row_time = ev_monotonic_now(loop());
>> +
>> +    struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
>> +    if (inj != NULL && inj->dparam > 0)
>> +        fiber_sleep(inj->dparam);
>> +}
>> +
>> +/**
>> + * Collect all the rows belonging to a single transaction and
>> + * send them at once.
>> + */
>> +static void
>> +relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>>   {
>>       struct relay *relay = container_of(stream, struct relay, stream);
>> +    static RLIST_HEAD(tx_rows);
>> +    struct errinj *inj;
>> +    struct relay_tx_row *tx_row;
>>       assert(iproto_type_is_dml(packet->type));
>>       if (packet->group_id == GROUP_LOCAL) {
>>           /*
>> @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct
>> xrow_header *packet)
>>            * order to correctly promote the vclock on the
>>            * replica.
>>            */
>> -        if (packet->replica_id == REPLICA_ID_NIL)
>> +        if (packet->replica_id == REPLICA_ID_NIL) {
>> +            /*
>> +             * Make sure is_commit flag from the
>> +             * local row makes it to the replica,
>> +             * in case the transaction is not fully
>> +             * local.
>> +             */
>> +            if (packet->is_commit && !rlist_empty(&tx_rows)) {
>> +                rlist_last_entry(&tx_rows, struct relay_tx_row,
>> +                         link)->row.is_commit = true;
>> +                goto write;
>> +            }
> I guees we can do simplier
>
> 		if (!packet->is_commit || rlist_empty(&tx_rows))
> 			return;
>
> 		rlist_last_entry(&tx_rows, struct relay_tx_row,
> 			link)->row.is_commit = true;
> 		goto write;
> ?
Yes, we can, thanks. I applied your diff.
>>               return;
>> +        }
>>           packet->type = IPROTO_NOP;
>>           packet->group_id = GROUP_DEFAULT;
>>           packet->bodycnt = 0;
>> @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct
>> xrow_header *packet)
>>        * it). In the latter case packet's LSN is less than or equal to
>>        * local master's LSN at the moment it received 'SUBSCRIBE' request.
>>        */
>> -    if (relay->replica == NULL ||
>> -        packet->replica_id != relay->replica->id ||
>> -        packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
>> -                      packet->replica_id)) {
>> -        struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
>> -                        ERRINJ_INT);
>> -        if (inj != NULL && packet->lsn == inj->iparam) {
>> -            packet->lsn = inj->iparam - 1;
>> -            say_warn("injected broken lsn: %lld",
>> -                 (long long) packet->lsn);
>> -        }
>> +    if (relay->replica != NULL && packet->replica_id == relay->replica->id
>> &&
>> +        packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
>> +                     packet->replica_id)) {
>> +        return;
>> +    }
>> +
>> +    inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
>> +    if (inj != NULL && packet->lsn == inj->iparam) {
>> +        packet->lsn = inj->iparam - 1;
>> +        say_warn("injected broken lsn: %lld",
>> +             (long long) packet->lsn);
>> +    }
>> +
>> +    /* A short path for single-statement transactions. */
>> +    if (packet->is_commit && rlist_empty(&tx_rows)) {
>>           relay_send(relay, packet);
>> +        return;
>> +    }
>> +
>> +    tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
>> +                             sizeof(*tx_row));
>> +    if (tx_row == NULL) {
>> +        tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
>> +              "struct relay_tx_row");
>> +    }
>> +    tx_row->row = *packet;
>> +    rlist_add_tail_entry(&tx_rows, tx_row, link);
>> +
>> +    if (packet->is_commit) {
>> +write:        relay_send_tx(relay, &tx_rows);
>> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
> Why?! This is stack local variable, no?
Why? It's static.
>
>> +        /*
>> +         * Free all the relay_tx_rows allocated on the
>> +         * fiber region.
>> +         */
>> +        fiber_gc();
>>       }
>>   }

-- 
Serge Petrenko

  reply	other threads:[~2020-05-19 10:49 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-05-18 12:24 [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework Serge Petrenko
2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko
2020-05-19  9:08   ` Cyrill Gorcunov
2020-05-19 10:30     ` Serge Petrenko
2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko
2020-05-18 12:28   ` Serge Petrenko
2020-05-19 10:23     ` Cyrill Gorcunov
2020-05-19 10:49       ` Serge Petrenko [this message]
2020-05-19 11:18         ` Cyrill Gorcunov
2020-05-19 12:31           ` Serge Petrenko
2020-05-19 16:24             ` Serge Petrenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=a321090e-3388-00e9-d849-2cbce6d05562@tarantool.org \
    --to=sergepetrenko@tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=tarantool-patches@dev.tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox